diff --git a/BotSharp.sln b/BotSharp.sln index 550d94ebf..a692f4036 100644 --- a/BotSharp.sln +++ b/BotSharp.sln @@ -143,6 +143,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Test.BrowserUse", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.LLM.Tests", "tests\BotSharp.LLM.Tests\BotSharp.LLM.Tests.csproj", "{7C0C7D13-D161-4AB0-9C29-83A0F1FF990E}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Test.RealtimeVoice", "tests\BotSharp.Test.RealtimeVoice\BotSharp.Test.RealtimeVoice.csproj", "{B067B126-88CD-4282-BEEF-7369B64423EF}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -599,6 +601,14 @@ Global {7C0C7D13-D161-4AB0-9C29-83A0F1FF990E}.Release|Any CPU.Build.0 = Release|Any CPU {7C0C7D13-D161-4AB0-9C29-83A0F1FF990E}.Release|x64.ActiveCfg = Release|Any CPU {7C0C7D13-D161-4AB0-9C29-83A0F1FF990E}.Release|x64.Build.0 = Release|Any CPU + {B067B126-88CD-4282-BEEF-7369B64423EF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B067B126-88CD-4282-BEEF-7369B64423EF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B067B126-88CD-4282-BEEF-7369B64423EF}.Debug|x64.ActiveCfg = Debug|Any CPU + {B067B126-88CD-4282-BEEF-7369B64423EF}.Debug|x64.Build.0 = Debug|Any CPU + {B067B126-88CD-4282-BEEF-7369B64423EF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B067B126-88CD-4282-BEEF-7369B64423EF}.Release|Any CPU.Build.0 = Release|Any CPU + {B067B126-88CD-4282-BEEF-7369B64423EF}.Release|x64.ActiveCfg = Release|Any CPU + {B067B126-88CD-4282-BEEF-7369B64423EF}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -668,6 +678,7 @@ Global {970BE341-9AC8-99A5-6572-E703C1E02FCB} = {E29DC6C4-5E57-48C5-BCB0-6B8F84782749} {7D0DB012-9798-4BB9-B15B-A5B0B7B3B094} = {32FAFFFE-A4CB-4FEE-BF7C-84518BBC6DCC} {7C0C7D13-D161-4AB0-9C29-83A0F1FF990E} = {32FAFFFE-A4CB-4FEE-BF7C-84518BBC6DCC} + {B067B126-88CD-4282-BEEF-7369B64423EF} = {32FAFFFE-A4CB-4FEE-BF7C-84518BBC6DCC} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A9969D89-C98B-40A5-A12B-FC87E55B3A19} diff --git a/src/Infrastructure/BotSharp.Abstraction/Agents/Settings/AgentSettings.cs b/src/Infrastructure/BotSharp.Abstraction/Agents/Settings/AgentSettings.cs index 232e7b60f..887c437ec 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Agents/Settings/AgentSettings.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Agents/Settings/AgentSettings.cs @@ -2,7 +2,7 @@ namespace BotSharp.Abstraction.Agents.Settings; public class AgentSettings { - public string DataDir { get; set; } = string.Empty; + public string DataDir { get; set; } = "agents"; public string TemplateFormat { get; set; } = "liquid"; public string HostAgentId { get; set; } = string.Empty; public bool EnableTranslator { get; set; } = false; diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs index bf29f5212..c4f131dc7 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs @@ -2,7 +2,7 @@ namespace BotSharp.Abstraction.Conversations.Settings; public class ConversationSetting { - public string DataDir { get; set; } + public string DataDir { get; set; } = "conversations"; public string ChatCompletion { get; set; } public bool EnableKnowledgeBase { get; set; } public bool ShowVerboseLog { get; set; } diff --git a/src/Infrastructure/BotSharp.Abstraction/MLTasks/IRealTimeCompletion.cs b/src/Infrastructure/BotSharp.Abstraction/MLTasks/IRealTimeCompletion.cs index efbb3cec1..b3827c338 100644 --- a/src/Infrastructure/BotSharp.Abstraction/MLTasks/IRealTimeCompletion.cs +++ b/src/Infrastructure/BotSharp.Abstraction/MLTasks/IRealTimeCompletion.cs @@ -18,6 +18,7 @@ Task Connect(RealtimeHubConnection conn, Action onInputAudioTranscriptionCompleted, Action onUserInterrupted); Task AppenAudioBuffer(string message); + Task AppenAudioBuffer(ArraySegment data, int length); Task SendEventToModel(object message); Task Disconnect(); diff --git a/src/Infrastructure/BotSharp.Abstraction/Realtime/Enums/StreamChannelStatus.cs b/src/Infrastructure/BotSharp.Abstraction/Realtime/Enums/StreamChannelStatus.cs new file mode 100644 index 000000000..18de754ec --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Realtime/Enums/StreamChannelStatus.cs @@ -0,0 +1,7 @@ +namespace BotSharp.Abstraction.Realtime.Enums; + +public enum StreamChannelStatus +{ + Open = 1, + Closed = 2 +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Realtime/IRealtimeHub.cs b/src/Infrastructure/BotSharp.Abstraction/Realtime/IRealtimeHub.cs index 23ecb6549..37e58f127 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Realtime/IRealtimeHub.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Realtime/IRealtimeHub.cs @@ -15,5 +15,5 @@ public interface IRealtimeHub IRealTimeCompletion Completer { get; } IRealTimeCompletion SetCompleter(string provider); - Task Listen(WebSocket userWebSocket, Action onUserMessageReceived); + Task ConnectToModel(Func responseToUser); } diff --git a/src/Infrastructure/BotSharp.Abstraction/Realtime/IStreamChannel.cs b/src/Infrastructure/BotSharp.Abstraction/Realtime/IStreamChannel.cs new file mode 100644 index 000000000..4dfc3e7da --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Realtime/IStreamChannel.cs @@ -0,0 +1,13 @@ +using BotSharp.Abstraction.Realtime.Enums; +using BotSharp.Abstraction.Realtime.Models; +using System.Threading; + +namespace BotSharp.Abstraction.Realtime; + +public interface IStreamChannel +{ + Task ConnectAsync(string conversationId); + Task ReceiveAsync(ArraySegment buffer, CancellationToken cancellation); + Task SendAsync(byte[] data, CancellationToken cancellation); + Task CloseAsync(StreamChannelStatus status, string description, CancellationToken cancellation); +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/ModelResponseEvent.cs b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/ModelResponseEvent.cs new file mode 100644 index 000000000..38e4a082e --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/ModelResponseEvent.cs @@ -0,0 +1,7 @@ +namespace BotSharp.Abstraction.Realtime.Models; + +public class ModelResponseEvent +{ + [JsonPropertyName("event")] + public string Event { get; set; } = string.Empty; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/ModelResponseMediaEvent.cs b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/ModelResponseMediaEvent.cs new file mode 100644 index 000000000..2694a1d51 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/ModelResponseMediaEvent.cs @@ -0,0 +1,7 @@ +namespace BotSharp.Abstraction.Realtime.Models; + +public class ModelResponseMediaEvent : ModelResponseEvent +{ + [JsonPropertyName("media")] + public string Media { get; set; } = null!; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs index 362e61bf6..b9f654c65 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs @@ -4,7 +4,6 @@ namespace BotSharp.Abstraction.Realtime.Models; public class RealtimeHubConnection { - public string Event { get; set; } = null!; public string StreamId { get; set; } = null!; public string? LastAssistantItemId { get; set; } = null!; public long LatestMediaTimestamp { get; set; } @@ -13,10 +12,9 @@ public class RealtimeHubConnection public ConcurrentQueue MarkQueue { get; set; } = new(); public string CurrentAgentId { get; set; } = null!; public string ConversationId { get; set; } = null!; - public string Data { get; set; } = string.Empty; - public Func OnModelMessageReceived { get; set; } = null!; - public Func OnModelAudioResponseDone { get; set; } = null!; - public Func OnModelUserInterrupted { get; set; } = null!; + public Func OnModelMessageReceived { get; set; } = null!; + public Func OnModelAudioResponseDone { get; set; } = null!; + public Func OnModelUserInterrupted { get; set; } = null!; public void ResetResponseState() { diff --git a/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeModelSettings.cs b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeModelSettings.cs index a1827ff01..2428bd5b2 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeModelSettings.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeModelSettings.cs @@ -2,6 +2,8 @@ namespace BotSharp.Abstraction.Realtime.Models; public class RealtimeModelSettings { + public string InputAudioFormat { get; set; } = "g711_ulaw"; + public string OutputAudioFormat { get; set; } = "g711_ulaw"; public string Voice { get; set; } = "alloy"; public float Temperature { get; set; } = 0.8f; public int MaxResponseOutputTokens { get; set; } = 512; diff --git a/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/StreamReceiveResult.cs b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/StreamReceiveResult.cs new file mode 100644 index 000000000..0dc3d03c8 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/StreamReceiveResult.cs @@ -0,0 +1,10 @@ +using BotSharp.Abstraction.Realtime.Enums; + +namespace BotSharp.Abstraction.Realtime.Models; + +public class StreamReceiveResult +{ + public StreamChannelStatus Status { get; set; } + public int Count { get; set; } + public bool EndOfMessage { get; } +} diff --git a/src/Infrastructure/BotSharp.Core.Realtime/BotSharp.Core.Realtime.csproj b/src/Infrastructure/BotSharp.Core.Realtime/BotSharp.Core.Realtime.csproj index 25218b089..6d69b0d7c 100644 --- a/src/Infrastructure/BotSharp.Core.Realtime/BotSharp.Core.Realtime.csproj +++ b/src/Infrastructure/BotSharp.Core.Realtime/BotSharp.Core.Realtime.csproj @@ -10,6 +10,10 @@ enable + + + + diff --git a/src/Infrastructure/BotSharp.Core.Realtime/RealtimePlugin.cs b/src/Infrastructure/BotSharp.Core.Realtime/RealtimePlugin.cs index fa9ee2684..5ed76aa3a 100644 --- a/src/Infrastructure/BotSharp.Core.Realtime/RealtimePlugin.cs +++ b/src/Infrastructure/BotSharp.Core.Realtime/RealtimePlugin.cs @@ -23,5 +23,6 @@ public void RegisterDI(IServiceCollection services, IConfiguration config) services.AddScoped(); services.AddScoped(); + services.AddScoped(); } } diff --git a/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs b/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs index 4cebb5b30..a1220d5a3 100644 --- a/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs +++ b/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs @@ -21,53 +21,9 @@ public RealtimeHub(IServiceProvider services, ILogger logger) _logger = logger; } - public async Task Listen(WebSocket userWebSocket, - Action onUserMessageReceived) + public async Task ConnectToModel(Func responseToUser) { - var buffer = new byte[1024 * 32]; - WebSocketReceiveResult result; - - do - { - Array.Clear(buffer, 0, buffer.Length); - result = await userWebSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); - string receivedText = Encoding.UTF8.GetString(buffer, 0, result.Count); - - if (string.IsNullOrEmpty(receivedText)) - { - continue; - } - - onUserMessageReceived(receivedText); - - if (_conn.Event == "user_connected") - { - await ConnectToModel(userWebSocket); - } - else if (_conn.Event == "user_data_received") - { - await _completer.AppenAudioBuffer(_conn.Data); - } - else if (_conn.Event == "user_dtmf_receiving") - { - } - else if (_conn.Event == "user_dtmf_received") - { - await HandleUserDtmfReceived(); - } - else if (_conn.Event == "user_disconnected") - { - await _completer.Disconnect(); - await HandleUserDisconnected(); - } - } while (!result.CloseStatus.HasValue); - - await userWebSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None); - } - - private async Task ConnectToModel(WebSocket userWebSocket) - { - var hookProvider = _services.GetRequiredService(); + var hookProvider = _services.GetService(); var convService = _services.GetRequiredService(); convService.SetConversationId(_conn.ConversationId, []); var conversation = await convService.GetConversation(_conn.ConversationId); @@ -103,7 +59,7 @@ await _completer.Connect(_conn, onModelAudioDeltaReceived: async (audioDeltaData, itemId) => { var data = _conn.OnModelMessageReceived(audioDeltaData); - await SendEventToUser(userWebSocket, data); + await responseToUser(data); // If this is the first delta of a new response, set the start timestamp if (!_conn.ResponseStartTimestamp.HasValue) @@ -118,12 +74,12 @@ await _completer.Connect(_conn, } // Send mark messages to Media Streams so we know if and when AI response playback is finished - await SendMark(userWebSocket, _conn); + // await SendMark(userWebSocket, _conn); }, onModelAudioResponseDone: async () => { var data = _conn.OnModelAudioResponseDone(); - await SendEventToUser(userWebSocket, data); + await responseToUser(data); }, onAudioTranscriptDone: async transcript => { @@ -151,7 +107,7 @@ await _completer.Connect(_conn, dialogs.Add(message); storage.Append(_conn.ConversationId, message); - foreach (var hook in hookProvider.HooksOrderByPriority) + foreach (var hook in hookProvider?.HooksOrderByPriority ?? []) { hook.SetAgent(agent) .SetConversation(conversation); @@ -172,7 +128,7 @@ await _completer.Connect(_conn, storage.Append(_conn.ConversationId, message); routing.Context.SetMessageId(_conn.ConversationId, message.MessageId); - foreach (var hook in hookProvider.HooksOrderByPriority) + foreach (var hook in hookProvider?.HooksOrderByPriority ?? []) { hook.SetAgent(agent) .SetConversation(conversation); @@ -186,69 +142,10 @@ await _completer.Connect(_conn, _conn.ResetResponseState(); var data = _conn.OnModelUserInterrupted(); - await SendEventToUser(userWebSocket, data); + await responseToUser(data); }); } - private async Task SendMark(WebSocket userWebSocket, RealtimeHubConnection conn) - { - if (!string.IsNullOrEmpty(conn.StreamId)) - { - var markEvent = new - { - @event = "mark", - streamSid = conn.StreamId, - mark = new { name = "responsePart" } - }; - await SendEventToUser(userWebSocket, markEvent); - conn.MarkQueue.Enqueue("responsePart"); - } - } - - private async Task HandleUserDtmfReceived() - { - var routing = _services.GetRequiredService(); - var hookProvider = _services.GetRequiredService(); - var agentService = _services.GetRequiredService(); - var agent = await agentService.LoadAgent(_conn.CurrentAgentId); - var dialogs = routing.Context.GetDialogs(); - var convService = _services.GetRequiredService(); - var conversation = await convService.GetConversation(_conn.ConversationId); - - var message = new RoleDialogModel(AgentRole.User, _conn.Data) - { - CurrentAgentId = routing.Context.GetCurrentAgentId() - }; - dialogs.Add(message); - - var storage = _services.GetRequiredService(); - storage.Append(_conn.ConversationId, message); - - foreach (var hook in hookProvider.HooksOrderByPriority) - { - hook.SetAgent(agent) - .SetConversation(conversation); - - await hook.OnMessageReceived(message); - } - - await _completer.InsertConversationItem(message); - var instruction = await _completer.UpdateSession(_conn); - await _completer.TriggerModelInference($"{instruction}\r\n\r\nReply based on the user input: {message.Content}"); - } - - private async Task HandleUserDisconnected() - { - - } - - private async Task SendEventToUser(WebSocket webSocket, object message) - { - var data = JsonSerializer.Serialize(message); - var buffer = Encoding.UTF8.GetBytes(data); - await webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Text, true, CancellationToken.None); - } - public RealtimeHubConnection SetHubConnection(string conversationId) { _conn = new RealtimeHubConnection diff --git a/src/Infrastructure/BotSharp.Core.Realtime/Services/WaveStremChannel.cs b/src/Infrastructure/BotSharp.Core.Realtime/Services/WaveStremChannel.cs new file mode 100644 index 000000000..cca96b2c7 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Realtime/Services/WaveStremChannel.cs @@ -0,0 +1,104 @@ +using BotSharp.Abstraction.Realtime.Enums; +using NAudio.Wave; +using System.Collections.Concurrent; +using System.IO; + +namespace BotSharp.Core.Realtime.Services; + +public class WaveStremChannel : IStreamChannel +{ + private readonly IServiceProvider _services; + private WaveInEvent _waveIn; + private WaveOutEvent _waveOut; + private BufferedWaveProvider _bufferedWaveProvider; + private readonly ConcurrentQueue _audioBufferQueue = new ConcurrentQueue(); + private readonly ILogger _logger; + + public WaveStremChannel(IServiceProvider services, ILogger logger) + { + _services = services; + _logger = logger; + } + + public async Task ConnectAsync(string conversationId) + { + // Initialize the WaveInEvent + _waveIn = new WaveInEvent + { + DeviceNumber = 0, // Default recording device + WaveFormat = new WaveFormat(24000, 16, 1), // 24000 Hz, 16-bit PCM, Mono + BufferMilliseconds = 100 + }; + + // Set up the DataAvailable event handler + _waveIn.DataAvailable += WaveIn_DataAvailable; + + // Start recording + _waveIn.StartRecording(); + + // Initialize audio output for streaming + var waveFormat = new WaveFormat(24000, 16, 1); // 24000 Hz, 16-bit PCM, Mono + _bufferedWaveProvider = new BufferedWaveProvider(waveFormat); + _bufferedWaveProvider.BufferLength = 1024 * 512; // Buffer length + _bufferedWaveProvider.DiscardOnBufferOverflow = true; + + _waveOut = new WaveOutEvent(); + _waveOut.Init(_bufferedWaveProvider); + _waveOut.Play(); + } + + public async Task ReceiveAsync(ArraySegment buffer, CancellationToken cancellation) + { + // Poll the queue until data is available or cancellation is requested + while (!cancellation.IsCancellationRequested) + { + // Try to dequeue audio data + if (_audioBufferQueue.TryDequeue(out byte[]? audioData)) + { + // Copy data to the provided buffer + int bytesToCopy = Math.Min(audioData.Length, buffer.Count); + Array.Copy(audioData, 0, buffer.Array, buffer.Offset, bytesToCopy); + + // Return the result + return new StreamReceiveResult + { + Status = StreamChannelStatus.Open, + Count = bytesToCopy + }; + } + + // No data available yet, wait a short time before checking again + await Task.Delay(10, cancellation); + } + + // Cancellation was requested + return new StreamReceiveResult(); + } + + public Task SendAsync(byte[] data, CancellationToken cancellation) + { + _logger.LogDebug($"Sending audio data of length {data.Length} to the stream channel."); + // Add the incoming data to the buffer for continuous playback + _bufferedWaveProvider.AddSamples(data, 0, data.Length); + return Task.CompletedTask; + } + + private void WaveIn_DataAvailable(object? sender, WaveInEventArgs e) + { + // Add the buffer to the queue + _audioBufferQueue.Enqueue(e.Buffer); + } + + public async Task CloseAsync(StreamChannelStatus status, string description, CancellationToken cancellation) + { + // Stop recording and clean up + _waveIn?.StopRecording(); + _waveIn?.Dispose(); + _waveIn = null; + + // Stop playback and clean up + _waveOut?.Stop(); + _waveOut?.Dispose(); + _waveOut = null; + } +} diff --git a/src/Infrastructure/BotSharp.OpenAPI/BotSharp.OpenAPI.csproj b/src/Infrastructure/BotSharp.OpenAPI/BotSharp.OpenAPI.csproj index 9162cd163..3a946c97a 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/BotSharp.OpenAPI.csproj +++ b/src/Infrastructure/BotSharp.OpenAPI/BotSharp.OpenAPI.csproj @@ -47,7 +47,8 @@ - + + diff --git a/src/Infrastructure/BotSharp.OpenAPI/ServiceBuilder.cs b/src/Infrastructure/BotSharp.OpenAPI/ServiceBuilder.cs new file mode 100644 index 000000000..422a29225 --- /dev/null +++ b/src/Infrastructure/BotSharp.OpenAPI/ServiceBuilder.cs @@ -0,0 +1,51 @@ +using BotSharp.Core; +using BotSharp.Core.Infrastructures; +using BotSharp.Core.Plugins; +using BotSharp.Logger; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; + +namespace BotSharp.OpenAPI; + +public class ServiceBuilder +{ + public static IServiceProvider CreateHostBuilder() + { + Console.WriteLine("Creating host builder..."); + + // Set up configuration + var configuration = new ConfigurationBuilder() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true) + .AddEnvironmentVariables() + .Build(); + + // Create host builder + var builder = Host.CreateDefaultBuilder() + .ConfigureAppConfiguration(config => + { + config.AddConfiguration(configuration); + }) + .ConfigureServices((context, services) => + { + services.AddSingleton(configuration); + services.AddBotSharpCore(context.Configuration, options => + { + }) + .AddBotSharpOpenAPI(context.Configuration, [], context.HostingEnvironment, true) + .AddBotSharpLogger(context.Configuration); + }); + + // Build the host + var host = builder.Build(); + var serviceProvider = host.Services; + + // Configure plugins + serviceProvider.GetRequiredService().Configure(null); + + // Set root services for SharpCacheAttribute + SharpCacheAttribute.Services = serviceProvider; + + return serviceProvider; + } +} diff --git a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Realtime/RealTimeCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Realtime/RealTimeCompletionProvider.cs index fe540e697..f630f5daa 100644 --- a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Realtime/RealTimeCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Realtime/RealTimeCompletionProvider.cs @@ -4,6 +4,7 @@ using GenerativeAI.Live; using GenerativeAI.Live.Extensions; using GenerativeAI.Types; +using System; namespace BotSharp.Plugin.GoogleAi.Providers.Realtime; @@ -94,6 +95,12 @@ public async Task AppenAudioBuffer(string message) await _client.SendAudioAsync(Convert.FromBase64String(message)); } + public async Task AppenAudioBuffer(ArraySegment data, int length) + { + var buffer = data.AsSpan(0, length).ToArray(); + await _client.SendAudioAsync(buffer); + } + public async Task TriggerModelInference(string? instructions = null) { await _client.SendClientContentAsync(new BidiGenerateContentClientContent() diff --git a/src/Plugins/BotSharp.Plugin.GoogleAI/Settings/GoogleAiSettings.cs b/src/Plugins/BotSharp.Plugin.GoogleAI/Settings/GoogleAiSettings.cs index a4e3468a8..01ce2feeb 100644 --- a/src/Plugins/BotSharp.Plugin.GoogleAI/Settings/GoogleAiSettings.cs +++ b/src/Plugins/BotSharp.Plugin.GoogleAI/Settings/GoogleAiSettings.cs @@ -2,21 +2,21 @@ namespace BotSharp.Plugin.GoogleAi.Settings; public class GoogleAiSettings { - public PaLMSetting PaLM { get; set; } + public PaLMSetting PaLM { get; set; } = new(); - public GeminiSetting Gemini { get; set; } + public GeminiSetting Gemini { get; set; } = new(); } public class PaLMSetting { public string Endpoint { get; set; } = string.Empty; - public string ApiKey { get; set; } + public string ApiKey { get; set; } = string.Empty; } public class GeminiSetting { - public string ApiKey { get; set; } + public string ApiKey { get; set; } = string.Empty; public bool UseGoogleSearch { get; set; } public bool UseGrounding { get; set; } } diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs index c18a57087..97703b304 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs @@ -85,6 +85,12 @@ public async Task AppenAudioBuffer(string message) await SendEventToModel(audioAppend); } + public async Task AppenAudioBuffer(ArraySegment data, int length) + { + var message = Convert.ToBase64String(data.AsSpan(0, length).ToArray()); + await AppenAudioBuffer(message); + } + public async Task TriggerModelInference(string? instructions = null) { // Triggering model inference @@ -165,9 +171,11 @@ private async Task ReceiveMessage(RealtimeHubConnection conn, { continue; } - _logger.LogDebug($"{nameof(RealTimeCompletionProvider)} received: {receivedText}"); + var response = JsonSerializer.Deserialize(receivedText); + _logger.LogDebug($"{nameof(RealTimeCompletionProvider)} received: {response.Type} {receivedText.Length}"); + if (response.Type == "error") { _logger.LogError($"{response.Type}: {receivedText}"); @@ -310,8 +318,8 @@ public async Task UpdateSession(RealtimeHubConnection conn, bool interru type = "session.update", session = new RealtimeSessionUpdateRequest { - InputAudioFormat = "g711_ulaw", - OutputAudioFormat = "g711_ulaw", + InputAudioFormat = realtimeModelSettings.InputAudioFormat, + OutputAudioFormat = realtimeModelSettings.OutputAudioFormat, /*InputAudioTranscription = new InputAudioTranscription { Model = realtimeModelSettings.InputAudioTranscription.Model, diff --git a/src/Plugins/BotSharp.Plugin.Twilio/TwilioStreamMiddleware.cs b/src/Plugins/BotSharp.Plugin.Twilio/TwilioStreamMiddleware.cs index 5726bf6e8..6ec723326 100644 --- a/src/Plugins/BotSharp.Plugin.Twilio/TwilioStreamMiddleware.cs +++ b/src/Plugins/BotSharp.Plugin.Twilio/TwilioStreamMiddleware.cs @@ -1,8 +1,12 @@ +using BotSharp.Abstraction.MLTasks; using BotSharp.Abstraction.Realtime; +using BotSharp.Abstraction.Realtime.Models; +using BotSharp.Abstraction.Routing; using BotSharp.Plugin.Twilio.Interfaces; using BotSharp.Plugin.Twilio.Models.Stream; using Microsoft.AspNetCore.Http; using System.Net.WebSockets; +using System.Threading; using Task = System.Threading.Tasks.Task; namespace BotSharp.Plugin.Twilio; @@ -63,81 +67,188 @@ private async Task HandleWebSocket(IServiceProvider services, string conversatio } convService.States.Save(); - await hub.Listen(webSocket, (receivedText) => + var buffer = new byte[1024 * 32]; + WebSocketReceiveResult result; + + do { - var response = JsonSerializer.Deserialize(receivedText); - conn.StreamId = response.StreamSid; + Array.Clear(buffer, 0, buffer.Length); + result = await webSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); + string receivedText = Encoding.UTF8.GetString(buffer, 0, result.Count); - switch (response.Event) + if (string.IsNullOrEmpty(receivedText)) { - case "start": - conn.Event = "user_connected"; - var startResponse = JsonSerializer.Deserialize(receivedText); - conn.Data = JsonSerializer.Serialize(startResponse.Body.CustomParameters); - conn.ResetStreamState(); - break; - case "media": - conn.Event = "user_data_received"; - var mediaResponse = JsonSerializer.Deserialize(receivedText); - conn.LatestMediaTimestamp = long.Parse(mediaResponse.Body.Timestamp); - conn.Data = mediaResponse.Body.Payload; - break; - case "stop": - conn.Event = "user_disconnected"; - break; - case "mark": - conn.Event = "mark"; - if (conn.MarkQueue.Count > 0) conn.MarkQueue.TryDequeue(out var _); - break; - case "dtmf": - var dtmfResponse = JsonSerializer.Deserialize(receivedText); - if (dtmfResponse.Body.Digit == "#") - { - conn.Event = "user_dtmf_received"; - conn.Data = conn.KeypadInputBuffer; - conn.KeypadInputBuffer = string.Empty; - } - else - { - conn.Event = "user_dtmf_receiving"; - conn.KeypadInputBuffer += dtmfResponse.Body.Digit; - } - break; - default: - conn.Event = response.Event; - break; + continue; } - conn.OnModelMessageReceived = message => - new - { - @event = "media", - streamSid = response.StreamSid, - media = new { payload = message } - }; + var (eventType, data) = MapEvents(conn, receivedText); - conn.OnModelAudioResponseDone = () => - new + if (eventType == "user_connected") + { + // Connect to model + await hub.ConnectToModel(async data => { - @event = "mark", - streamSid = response.StreamSid, - mark = new { name = "responsePart" } - }; + await SendEventToUser(webSocket, data); + }); + } + else if (eventType == "user_data_received") + { + await completer.AppenAudioBuffer(data); + } + else if (eventType == "user_dtmf_receiving") + { + } + else if (eventType == "user_dtmf_received") + { + await HandleUserDtmfReceived(services, conn, completer, data); + } + else if (eventType == "user_disconnected") + { + await completer.Disconnect(); + await HandleUserDisconnected(); + } + } while (!result.CloseStatus.HasValue); + + await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None); + } + + private (string, string) MapEvents(RealtimeHubConnection conn, string receivedText) + { + var response = JsonSerializer.Deserialize(receivedText); + conn.StreamId = response.StreamSid; + string eventType = response.Event; + string data = string.Empty; - conn.OnModelUserInterrupted = () => - new + switch (response.Event) + { + case "start": + eventType = "user_connected"; + var startResponse = JsonSerializer.Deserialize(receivedText); + data = JsonSerializer.Serialize(startResponse.Body.CustomParameters); + conn.ResetStreamState(); + break; + case "media": + eventType = "user_data_received"; + var mediaResponse = JsonSerializer.Deserialize(receivedText); + conn.LatestMediaTimestamp = long.Parse(mediaResponse.Body.Timestamp); + data = mediaResponse.Body.Payload; + break; + case "stop": + eventType = "user_disconnected"; + break; + case "mark": + eventType = "mark"; + if (conn.MarkQueue.Count > 0) conn.MarkQueue.TryDequeue(out var _); + break; + case "dtmf": + var dtmfResponse = JsonSerializer.Deserialize(receivedText); + if (dtmfResponse.Body.Digit == "#") { - @event = "clear", - streamSid = response.StreamSid - }; + eventType = "user_dtmf_received"; + data = conn.KeypadInputBuffer; + conn.KeypadInputBuffer = string.Empty; + } + else + { + eventType = "user_dtmf_receiving"; + conn.KeypadInputBuffer += dtmfResponse.Body.Digit; + } + break; + default: + eventType = response.Event; + break; + } + + conn.OnModelMessageReceived = message => + JsonSerializer.Serialize(new + { + @event = "media", + streamSid = response.StreamSid, + media = new { payload = message } + }); + + conn.OnModelAudioResponseDone = () => + JsonSerializer.Serialize(new + { + @event = "mark", + streamSid = response.StreamSid, + mark = new { name = "responsePart" } + }); + + conn.OnModelUserInterrupted = () => + JsonSerializer.Serialize(new + { + @event = "clear", + streamSid = response.StreamSid + }); + + /*if (response.Event == "dtmf") + { + // Send a Stop command to Twilio + string stopPlaybackCommand = "{ \"action\": \"stop_playback\" }"; + var stopBytes = Encoding.UTF8.GetBytes(stopPlaybackCommand); + webSocket.SendAsync(new ArraySegment(stopBytes), WebSocketMessageType.Text, true, CancellationToken.None); + }*/ + + return (eventType, data); + } + + private async Task HandleUserDisconnected() + { - /*if (response.Event == "dtmf") + } + + private async Task SendMark(WebSocket userWebSocket, RealtimeHubConnection conn) + { + if (!string.IsNullOrEmpty(conn.StreamId)) + { + var markEvent = new { - // Send a Stop command to Twilio - string stopPlaybackCommand = "{ \"action\": \"stop_playback\" }"; - var stopBytes = Encoding.UTF8.GetBytes(stopPlaybackCommand); - webSocket.SendAsync(new ArraySegment(stopBytes), WebSocketMessageType.Text, true, CancellationToken.None); - }*/ - }); + @event = "mark", + streamSid = conn.StreamId, + mark = new { name = "responsePart" } + }; + var message = JsonSerializer.Serialize(markEvent); + await SendEventToUser(userWebSocket, message); + conn.MarkQueue.Enqueue("responsePart"); + } + } + + private async Task SendEventToUser(WebSocket webSocket, string message) + { + var buffer = Encoding.UTF8.GetBytes(message); + await webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Text, true, CancellationToken.None); + } + + private async Task HandleUserDtmfReceived(IServiceProvider _services, RealtimeHubConnection conn, IRealTimeCompletion completer, string data) + { + var routing = _services.GetRequiredService(); + var hookProvider = _services.GetRequiredService(); + var agentService = _services.GetRequiredService(); + var agent = await agentService.LoadAgent(conn.CurrentAgentId); + var dialogs = routing.Context.GetDialogs(); + var convService = _services.GetRequiredService(); + var conversation = await convService.GetConversation(conn.ConversationId); + + var message = new RoleDialogModel(AgentRole.User, data) + { + CurrentAgentId = routing.Context.GetCurrentAgentId() + }; + dialogs.Add(message); + + var storage = _services.GetRequiredService(); + storage.Append(conn.ConversationId, message); + + foreach (var hook in hookProvider.HooksOrderByPriority) + { + hook.SetAgent(agent) + .SetConversation(conversation); + + await hook.OnMessageReceived(message); + } + + await completer.InsertConversationItem(message); + var instruction = await completer.UpdateSession(conn); + await completer.TriggerModelInference($"{instruction}\r\n\r\nReply based on the user input: {message.Content}"); } } diff --git a/tests/BotSharp.Test.RealtimeVoice/BotSharp.Test.RealtimeVoice.csproj b/tests/BotSharp.Test.RealtimeVoice/BotSharp.Test.RealtimeVoice.csproj new file mode 100644 index 000000000..6de9c2053 --- /dev/null +++ b/tests/BotSharp.Test.RealtimeVoice/BotSharp.Test.RealtimeVoice.csproj @@ -0,0 +1,31 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + PreserveNewest + + + + + + + + + + + + + + + diff --git a/tests/BotSharp.Test.RealtimeVoice/Program.cs b/tests/BotSharp.Test.RealtimeVoice/Program.cs new file mode 100644 index 000000000..7ed4383fb --- /dev/null +++ b/tests/BotSharp.Test.RealtimeVoice/Program.cs @@ -0,0 +1,103 @@ +using BotSharp.Abstraction.Conversations.Enums; +using BotSharp.Abstraction.Conversations.Models; +using BotSharp.Abstraction.Conversations; +using BotSharp.OpenAPI; +using System.Text.Json; +using Google.Ai.Generativelanguage.V1Beta2; + +var services = ServiceBuilder.CreateHostBuilder(); +var channel = services.GetRequiredService(); + +Console.WriteLine("PCM-16 Microphone Capture (24kHz Sample Rate)"); +Console.WriteLine("-----------------------------------------------"); + +var convService = services.GetRequiredService(); +var conv = new Conversation +{ + AgentId = "01e2fc5c-2c89-4ec7-8470-7688608b496c", + Channel = ConversationChannel.Phone, + Title = $"Test", + Tags = [], +}; +conv = await convService.NewConversation(conv); + +await channel.ConnectAsync(conv.Id); + +var hub = services.GetRequiredService(); +var conn = hub.SetHubConnection(conv.Id); +var completer = hub.SetCompleter("openai"); + +await hub.ConnectToModel(async data => +{ + var response = JsonSerializer.Deserialize(data); + if (response.Event == "media") + { + var message = JsonSerializer.Deserialize(data); + await channel.SendAsync(Convert.FromBase64String(message.Media), CancellationToken.None); + } +}); + +StreamReceiveResult result; +var buffer = new byte[1024 * 8]; + +conn.OnModelMessageReceived = message => + JsonSerializer.Serialize(new + { + @event = "media", + media = message + }); + +conn.OnModelAudioResponseDone = () => + JsonSerializer.Serialize(new + { + @event = "mark", + mark = new { name = "responsePart" } + }); + +conn.OnModelUserInterrupted = () => + JsonSerializer.Serialize(new + { + @event = "clear" + }); + +do +{ + var seg = new ArraySegment(buffer); + result = await channel.ReceiveAsync(seg, CancellationToken.None); + + await completer.AppenAudioBuffer(seg, result.Count); + + // Display the audio level + int audioLevel = CalculateAudioLevel(buffer, result.Count); + DisplayAudioLevel(audioLevel); +} while (result.Status == StreamChannelStatus.Open); + +int CalculateAudioLevel(byte[] buffer, int bytesRecorded) +{ + // Simple audio level calculation (RMS) + int sum = 0; + for (int i = 0; i < bytesRecorded; i += 2) + { + if (i + 1 < bytesRecorded) + { + short sample = (short)((buffer[i + 1] << 8) | buffer[i]); + sum += Math.Abs(sample); + } + } + return bytesRecorded > 0 ? sum / (bytesRecorded / 2) : 0; +} + +void DisplayAudioLevel(int level) +{ + // Normalize level to 0-50 range for display + int displayLevel = Math.Min(50, level / 100); + + // Clear the current line + Console.Write("\r" + new string(' ', 60)); + + // Display audio level as a bar + Console.Write("\rMicrophone: ["); + Console.Write(new string('#', displayLevel)); + Console.Write(new string(' ', 50 - displayLevel)); + Console.Write("]"); +} diff --git a/tests/BotSharp.Test.RealtimeVoice/Using.cs b/tests/BotSharp.Test.RealtimeVoice/Using.cs new file mode 100644 index 000000000..2a04d907c --- /dev/null +++ b/tests/BotSharp.Test.RealtimeVoice/Using.cs @@ -0,0 +1,11 @@ +global using Microsoft.Extensions.Hosting; +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Configuration; + +global using BotSharp.Core; +global using BotSharp.Core.Infrastructures; +global using BotSharp.Core.Plugins; +global using BotSharp.Logger; +global using BotSharp.Abstraction.Realtime.Models; +global using BotSharp.Abstraction.Realtime; +global using BotSharp.Abstraction.Realtime.Enums; diff --git a/tests/BotSharp.Test.RealtimeVoice/appsettings.json b/tests/BotSharp.Test.RealtimeVoice/appsettings.json new file mode 100644 index 000000000..28ada7ddb --- /dev/null +++ b/tests/BotSharp.Test.RealtimeVoice/appsettings.json @@ -0,0 +1,63 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + + "LlmProviders": [ + { + "Provider": "openai", + "Models": [ + { + "Id": "gpt-4o", + "Name": "gpt-4o-mini-realtime-preview", + "Version": "2024-12-17", + "ApiKey": "", + "Type": "realtime", + "MultiModal": true, + "PromptCost": 0.0025, + "CompletionCost": 0.01 + } + ] + }, + { + "Provider": "google-ai", + "Models": [ + { + "Id": "gemini-2.0", + "Name": "gemini-2.0-flash-exp", + "Version": "20240620", + "ApiKey": "", + "Type": "realtime", + "MultiModal": true, + "PromptCost": 0.003, + "CompletionCost": 0.015 + } + ] + } + ], + + "Database": { + "Default": "FileRepository", + "TablePrefix": "BotSharp", + "BotSharpMongoDb": "", + "FileRepository": "data", + "Assemblies": [ "BotSharp.Core" ] + }, + + "RealtimeModel": { + "InputAudioFormat": "pcm16", + "OutputAudioFormat": "pcm16" + }, + + "PluginLoader": { + "Assemblies": [ + "BotSharp.Core", + "BotSharp.Core.Realtime", + "BotSharp.Plugin.OpenAI", + "BotSharp.Plugin.GoogleAI" + ] + } +}