From 573e887558465d92b67761abf031b8227ec2c926 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 22 Jul 2025 03:43:09 +0900 Subject: [PATCH 01/37] Organize imports --- lib/src/core/engine.dart | 14 +++++++++++--- lib/src/data_stream/stream_writer.dart | 6 +++--- lib/src/livekit.dart | 2 +- lib/src/track/audio_visualizer.dart | 2 +- lib/src/track/audio_visualizer_web.dart | 2 +- lib/src/track/local/video.dart | 5 ++--- lib/src/types/participant_permissions.dart | 3 +-- 7 files changed, 20 insertions(+), 14 deletions(-) diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index 331bf3299..25b371af8 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -16,23 +16,31 @@ import 'dart:async'; -import 'package:flutter/foundation.dart'; - import 'package:collection/collection.dart'; import 'package:connectivity_plus/connectivity_plus.dart'; +import 'package:flutter/foundation.dart'; import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; import 'package:meta/meta.dart'; -import 'package:livekit_client/livekit_client.dart'; +import '../events.dart'; +import '../exceptions.dart'; import '../extensions.dart'; import '../internal/events.dart'; import '../internal/types.dart'; +import '../logger.dart' show logger; +import '../managers/event.dart'; +import '../options.dart'; import '../proto/livekit_models.pb.dart' as lk_models; import '../proto/livekit_rtc.pb.dart' as lk_rtc; +import '../publication/local.dart'; import '../support/disposable.dart'; +import '../support/platform.dart' show lkPlatformIsTest, lkPlatformIs, PlatformType; import '../support/region_url_provider.dart'; import '../support/websocket.dart'; +import '../track/local/local.dart'; +import '../track/local/video.dart'; import '../types/internal.dart'; +import '../types/other.dart'; import 'signal_client.dart'; import 'transport.dart'; diff --git a/lib/src/data_stream/stream_writer.dart b/lib/src/data_stream/stream_writer.dart index c60e37e89..1d2832a80 100644 --- a/lib/src/data_stream/stream_writer.dart +++ b/lib/src/data_stream/stream_writer.dart @@ -3,11 +3,11 @@ import 'dart:typed_data'; import 'package:fixnum/fixnum.dart'; -import 'package:livekit_client/src/core/engine.dart'; -import 'package:livekit_client/src/types/other.dart'; -import 'package:livekit_client/src/utils.dart'; +import '../core/engine.dart'; import '../proto/livekit_models.pb.dart' as lk_models; import '../types/data_stream.dart'; +import '../types/other.dart'; +import '../utils.dart'; class BaseStreamWriter { final StreamWriter writableStream; diff --git a/lib/src/livekit.dart b/lib/src/livekit.dart index acfd4fa50..428240184 100644 --- a/lib/src/livekit.dart +++ b/lib/src/livekit.dart @@ -14,8 +14,8 @@ import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; -import 'package:livekit_client/livekit_client.dart'; import 'support/native.dart'; +import 'support/platform.dart' show lkPlatformIsMobile; /// Main entry point to connect to a room. /// {@category Room} diff --git a/lib/src/track/audio_visualizer.dart b/lib/src/track/audio_visualizer.dart index 13718e771..9f4982ae0 100644 --- a/lib/src/track/audio_visualizer.dart +++ b/lib/src/track/audio_visualizer.dart @@ -1,6 +1,6 @@ import 'package:uuid/uuid.dart' as uuid; -import 'package:livekit_client/src/support/disposable.dart'; +import '../support/disposable.dart'; import '../events.dart' show AudioVisualizerEvent; import '../managers/event.dart' show EventsEmittable; import 'local/local.dart' show AudioTrack; diff --git a/lib/src/track/audio_visualizer_web.dart b/lib/src/track/audio_visualizer_web.dart index c57c64ef8..fc34140e6 100644 --- a/lib/src/track/audio_visualizer_web.dart +++ b/lib/src/track/audio_visualizer_web.dart @@ -5,7 +5,7 @@ import 'dart:typed_data'; import 'package:flutter_webrtc/flutter_webrtc.dart'; -import 'package:livekit_client/src/events.dart' show AudioVisualizerEvent; +import '../events.dart' show AudioVisualizerEvent; import '../logger.dart' show logger; import 'audio_visualizer.dart'; import 'local/local.dart' show AudioTrack; diff --git a/lib/src/track/local/video.dart b/lib/src/track/local/video.dart index 44d3afc94..f2da07c3e 100644 --- a/lib/src/track/local/video.dart +++ b/lib/src/track/local/video.dart @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -import 'package:flutter/foundation.dart'; - import 'package:collection/collection.dart'; +import 'package:flutter/foundation.dart'; import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; -import 'package:livekit_client/src/extensions.dart'; import '../../events.dart'; import '../../exceptions.dart'; +import '../../extensions.dart'; import '../../logger.dart'; import '../../options.dart'; import '../../proto/livekit_models.pb.dart' as lk_models; diff --git a/lib/src/types/participant_permissions.dart b/lib/src/types/participant_permissions.dart index 36d6ce41c..22ac7faab 100644 --- a/lib/src/types/participant_permissions.dart +++ b/lib/src/types/participant_permissions.dart @@ -14,7 +14,6 @@ import 'package:meta/meta.dart'; -import 'package:livekit_client/src/proto/livekit_models.pbenum.dart'; import '../proto/livekit_models.pb.dart' as lk_models; @immutable @@ -24,7 +23,7 @@ class ParticipantPermissions { final bool canPublishData; final bool hidden; final bool canUpdateMetadata; - final List canPublishSources; + final List canPublishSources; const ParticipantPermissions({ this.canSubscribe = false, From 4655b0be782df237769adb26ceb08b3cbcc5a330 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 22 Jul 2025 03:47:33 +0900 Subject: [PATCH 02/37] Update options.dart --- lib/src/options.dart | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/src/options.dart b/lib/src/options.dart index 7d896ebf3..1f64401d1 100644 --- a/lib/src/options.dart +++ b/lib/src/options.dart @@ -14,12 +14,9 @@ import 'constants.dart'; import 'e2ee/options.dart'; -import 'proto/livekit_models.pb.dart'; -import 'publication/remote.dart'; import 'track/local/audio.dart'; import 'track/local/video.dart'; import 'track/options.dart'; -import 'track/track.dart'; import 'types/other.dart'; import 'types/video_encoding.dart'; import 'types/video_parameters.dart'; From 808c18bc050310a5e545a1fcab68161f8fc8a215 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 22 Jul 2025 20:43:51 +0900 Subject: [PATCH 03/37] Impl --- ios/Classes/AudioRenderer.swift | 1 + macos/Classes/AudioRenderer.swift | 1 + shared_swift/AudioRenderer.swift | 68 +++++ shared_swift/LiveKitPlugin.swift | 417 ++++++++++++++++++------------ 4 files changed, 318 insertions(+), 169 deletions(-) create mode 120000 ios/Classes/AudioRenderer.swift create mode 120000 macos/Classes/AudioRenderer.swift create mode 100644 shared_swift/AudioRenderer.swift diff --git a/ios/Classes/AudioRenderer.swift b/ios/Classes/AudioRenderer.swift new file mode 120000 index 000000000..a1bce0cda --- /dev/null +++ b/ios/Classes/AudioRenderer.swift @@ -0,0 +1 @@ +../../shared_swift/AudioRenderer.swift \ No newline at end of file diff --git a/macos/Classes/AudioRenderer.swift b/macos/Classes/AudioRenderer.swift new file mode 120000 index 000000000..a1bce0cda --- /dev/null +++ b/macos/Classes/AudioRenderer.swift @@ -0,0 +1 @@ +../../shared_swift/AudioRenderer.swift \ No newline at end of file diff --git a/shared_swift/AudioRenderer.swift b/shared_swift/AudioRenderer.swift new file mode 100644 index 000000000..71d8b479e --- /dev/null +++ b/shared_swift/AudioRenderer.swift @@ -0,0 +1,68 @@ +/* + * Copyright 2024 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import AVFoundation +import WebRTC + +#if os(macOS) + import Cocoa + import FlutterMacOS +#else + import Flutter + import UIKit +#endif + +public class AudioRenderer: NSObject { + private var eventSink: FlutterEventSink? + private var channel: FlutterEventChannel? + + private weak var _track: AudioTrack? + + public init(track: AudioTrack?, + binaryMessenger: FlutterBinaryMessenger, + rendererId: String) + { + _track = track + super.init() + _track?.add(audioRenderer: self) + + let channelName = "io.livekit.audio.renderer/eventchannel-" + rendererId + channel = FlutterEventChannel(name: channelName, binaryMessenger: binaryMessenger) + channel?.setStreamHandler(self) + } + + deinit { + _track?.remove(audioRenderer: self) + } +} + +extension AudioRenderer: FlutterStreamHandler { + public func onListen(withArguments _: Any?, eventSink events: @escaping FlutterEventSink) -> FlutterError? { + eventSink = events + return nil + } + + public func onCancel(withArguments _: Any?) -> FlutterError? { + eventSink = nil + return nil + } +} + +extension AudioRenderer: RTCAudioRenderer { + public func render(pcmBuffer _: AVAudioPCMBuffer) { + eventSink?("audio_renderer_event") + } +} diff --git a/shared_swift/LiveKitPlugin.swift b/shared_swift/LiveKitPlugin.swift index aed2f4478..9e6cac6fd 100644 --- a/shared_swift/LiveKitPlugin.swift +++ b/shared_swift/LiveKitPlugin.swift @@ -12,36 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. -import WebRTC import flutter_webrtc +import WebRTC #if os(macOS) -import Cocoa -import FlutterMacOS + import Cocoa + import FlutterMacOS #else -import Flutter -import UIKit -import Combine + import Combine + import Flutter + import UIKit #endif +let trackIdKey = "visualizerId" +let visualizerIdKey = "visualizerId" +let rendererIdKey = "rendererId" + +class AudioProcessors { + var track: AudioTrack + var visualizers: [String: Visualizer] = [:] + var renderers: [String: AudioRenderer] = [:] + + init(track: AudioTrack) { + self.track = track + } +} + @available(iOS 13.0, *) public class LiveKitPlugin: NSObject, FlutterPlugin { - - var processors: Dictionary = [:] - var tracks: Dictionary = [:] + // TrackId: AudioProcessors + var audioProcessors: [String: AudioProcessors] = [:] var binaryMessenger: FlutterBinaryMessenger? #if os(iOS) - var cancellable = Set() + var cancellable = Set() #endif public static func register(with registrar: FlutterPluginRegistrar) { - #if os(macOS) - let messenger = registrar.messenger + let messenger = registrar.messenger #else - let messenger = registrar.messenger() + let messenger = registrar.messenger() #endif let channel = FlutterMethodChannel(name: "livekit_client", binaryMessenger: messenger) @@ -50,198 +62,265 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { registrar.addMethodCallDelegate(instance, channel: channel) #if os(iOS) - BroadcastManager.shared.isBroadcastingPublisher - .sink { isBroadcasting in - channel.invokeMethod("broadcastStateChanged", arguments: isBroadcasting) - } - .store(in: &instance.cancellable) + BroadcastManager.shared.isBroadcastingPublisher + .sink { isBroadcasting in + channel.invokeMethod("broadcastStateChanged", arguments: isBroadcasting) + } + .store(in: &instance.cancellable) #endif } #if !os(macOS) - // https://developer.apple.com/documentation/avfaudio/avaudiosession/category - let categoryMap: [String: AVAudioSession.Category] = [ - "ambient": .ambient, - "multiRoute": .multiRoute, - "playAndRecord": .playAndRecord, - "playback": .playback, - "record": .record, - "soloAmbient": .soloAmbient - ] - - // https://developer.apple.com/documentation/avfaudio/avaudiosession/categoryoptions - let categoryOptionsMap: [String: AVAudioSession.CategoryOptions] = [ - "mixWithOthers": .mixWithOthers, - "duckOthers": .duckOthers, - "interruptSpokenAudioAndMixWithOthers": .interruptSpokenAudioAndMixWithOthers, - "allowBluetooth": .allowBluetooth, - "allowBluetoothA2DP": .allowBluetoothA2DP, - "allowAirPlay": .allowAirPlay, - "defaultToSpeaker": .defaultToSpeaker - // @available(iOS 14.5, *) - // "overrideMutedMicrophoneInterruption": .overrideMutedMicrophoneInterruption, - ] - - // https://developer.apple.com/documentation/avfaudio/avaudiosession/mode - let modeMap: [String: AVAudioSession.Mode] = [ - "default": .default, - "gameChat": .gameChat, - "measurement": .measurement, - "moviePlayback": .moviePlayback, - "spokenAudio": .spokenAudio, - "videoChat": .videoChat, - "videoRecording": .videoRecording, - "voiceChat": .voiceChat, - "voicePrompt": .voicePrompt - ] - - private func categoryOptions(fromFlutter options: [String]) -> AVAudioSession.CategoryOptions { - var result: AVAudioSession.CategoryOptions = [] - for option in categoryOptionsMap { - if options.contains(option.key) { - result.insert(option.value) + // https://developer.apple.com/documentation/avfaudio/avaudiosession/category + let categoryMap: [String: AVAudioSession.Category] = [ + "ambient": .ambient, + "multiRoute": .multiRoute, + "playAndRecord": .playAndRecord, + "playback": .playback, + "record": .record, + "soloAmbient": .soloAmbient, + ] + + // https://developer.apple.com/documentation/avfaudio/avaudiosession/categoryoptions + let categoryOptionsMap: [String: AVAudioSession.CategoryOptions] = [ + "mixWithOthers": .mixWithOthers, + "duckOthers": .duckOthers, + "interruptSpokenAudioAndMixWithOthers": .interruptSpokenAudioAndMixWithOthers, + "allowBluetooth": .allowBluetooth, + "allowBluetoothA2DP": .allowBluetoothA2DP, + "allowAirPlay": .allowAirPlay, + "defaultToSpeaker": .defaultToSpeaker, + // @available(iOS 14.5, *) + // "overrideMutedMicrophoneInterruption": .overrideMutedMicrophoneInterruption, + ] + + // https://developer.apple.com/documentation/avfaudio/avaudiosession/mode + let modeMap: [String: AVAudioSession.Mode] = [ + "default": .default, + "gameChat": .gameChat, + "measurement": .measurement, + "moviePlayback": .moviePlayback, + "spokenAudio": .spokenAudio, + "videoChat": .videoChat, + "videoRecording": .videoRecording, + "voiceChat": .voiceChat, + "voicePrompt": .voicePrompt, + ] + + private func categoryOptions(fromFlutter options: [String]) -> AVAudioSession.CategoryOptions { + var result: AVAudioSession.CategoryOptions = [] + for option in categoryOptionsMap { + if options.contains(option.key) { + result.insert(option.value) + } } + return result } - return result - } #endif - public func handleStartAudioVisualizer(args: [String: Any?], result: @escaping FlutterResult) { + private func audioProcessors(for trackId: String) -> AudioProcessors? { + if let existing = audioProcessors[trackId] { + return existing + } + let webrtc = FlutterWebRTCPlugin.sharedSingleton() - let trackId = args["trackId"] as? String - let visualizerId = args["visualizerId"] as? String + var audioTrack: AudioTrack? + if let track = webrtc?.localTracks![trackId] as? LocalAudioTrack { + audioTrack = LKLocalAudioTrack(name: trackId, track: track) + } else if let track = webrtc?.remoteTrack(forId: trackId) as? RTCAudioTrack { + audioTrack = LKRemoteAudioTrack(name: trackId, track: track) + } + + guard let audioTrack else { + return nil + } + + let processor = AudioProcessors(track: audioTrack) + audioProcessors[trackId] = processor + return processor + } + + public func handleStartAudioVisualizer(args: [String: Any?], result: @escaping FlutterResult) { + // Required params + let trackId = args[trackIdKey] as? String + let visualizerId = args[visualizerIdKey] as? String + + guard let trackId else { + result(FlutterError(code: trackIdKey, message: "\(trackIdKey) is required", details: nil)) + return + } + + guard let visualizerId else { + result(FlutterError(code: visualizerIdKey, message: "\(visualizerIdKey) is required", details: nil)) + return + } + + // Optional params let barCount = args["barCount"] as? Int ?? 7 let isCentered = args["isCentered"] as? Bool ?? true let smoothTransition = args["smoothTransition"] as? Bool ?? true - if visualizerId == nil { - result(FlutterError(code: "visualizerId", message: "visualizerId is required", details: nil)) + guard let processors = audioProcessors(for: trackId) else { + result(FlutterError(code: trackIdKey, message: "No such track", details: nil)) return } - if let unwrappedTrackId = trackId { - let unwrappedVisualizerId = visualizerId! - - let localTrack = webrtc?.localTracks![unwrappedTrackId] - if let audioTrack = localTrack as? LocalAudioTrack { - let lkLocalTrack = LKLocalAudioTrack(name: unwrappedTrackId, track: audioTrack); - let processor = Visualizer(track: lkLocalTrack, - binaryMessenger: self.binaryMessenger!, - bandCount: barCount, - isCentered: isCentered, - smoothTransition: smoothTransition, - visualizerId: unwrappedVisualizerId) - - tracks[unwrappedTrackId] = lkLocalTrack - processors[unwrappedVisualizerId] = processor - - } - - let track = webrtc?.remoteTrack(forId: unwrappedTrackId) - if let audioTrack = track as? RTCAudioTrack { - let lkRemoteTrack = LKRemoteAudioTrack(name: unwrappedTrackId, track: audioTrack); - let processor = Visualizer(track: lkRemoteTrack, - binaryMessenger: self.binaryMessenger!, - bandCount: barCount, - isCentered: isCentered, - smoothTransition: smoothTransition, - visualizerId: unwrappedVisualizerId) - tracks[unwrappedTrackId] = lkRemoteTrack - processors[unwrappedVisualizerId] = processor - } + // Already exists + if processors.visualizers[visualizerId] != nil { + result(true) + return } + let visualizer = Visualizer(track: processors.track, + binaryMessenger: binaryMessenger!, + bandCount: barCount, + isCentered: isCentered, + smoothTransition: smoothTransition, + visualizerId: visualizerId) + // Retain + processors.visualizers[visualizerId] = visualizer result(true) } public func handleStopAudioVisualizer(args: [String: Any?], result: @escaping FlutterResult) { - let trackId = args["trackId"] as? String - let visualizerId = args["visualizerId"] as? String - if let unwrappedTrackId = trackId { - for key in tracks.keys { - if key == unwrappedTrackId { - tracks.removeValue(forKey: key) - } - } + // let trackId = args["trackId"] as? String + let visualizerId = args[visualizerIdKey] as? String + + guard let visualizerId else { + result(FlutterError(code: visualizerIdKey, message: "\(visualizerIdKey) is required", details: nil)) + return } - if let unwrappedVisualizerId = visualizerId { - processors.removeValue(forKey: unwrappedVisualizerId) + + for processors in audioProcessors.values { + processors.visualizers.removeValue(forKey: visualizerId) } + result(true) } - public func handleConfigureNativeAudio(args: [String: Any?], result: @escaping FlutterResult) { + public func handleStartAudioRenderer(args: [String: Any?], result: @escaping FlutterResult) { + // Required params + let trackId = args[trackIdKey] as? String + let rendererId = args[rendererIdKey] as? String - #if os(macOS) - result(FlutterMethodNotImplemented) - #else - - let configuration = RTCAudioSessionConfiguration.webRTC() + guard let trackId else { + result(FlutterError(code: trackIdKey, message: "\(trackIdKey) is required", details: nil)) + return + } - // Category - if let string = args["appleAudioCategory"] as? String, - let category = categoryMap[string] { - configuration.category = category.rawValue - print("[LiveKit] Configuring category: ", configuration.category) + guard let rendererId else { + result(FlutterError(code: rendererIdKey, message: "\(rendererIdKey) is required", details: nil)) + return } - // CategoryOptions - if let strings = args["appleAudioCategoryOptions"] as? [String] { - configuration.categoryOptions = categoryOptions(fromFlutter: strings) - print("[LiveKit] Configuring categoryOptions: ", strings) + guard let processors = audioProcessors(for: trackId) else { + result(FlutterError(code: trackIdKey, message: "No such track", details: nil)) + return } - // Mode - if let string = args["appleAudioMode"] as? String, - let mode = modeMap[string] { - configuration.mode = mode.rawValue - print("[LiveKit] Configuring mode: ", configuration.mode) + // Already exists + if processors.visualizers[rendererId] != nil { + result(true) + return } - // get `RTCAudioSession` and lock - let rtcSession = RTCAudioSession.sharedInstance() - rtcSession.lockForConfiguration() + let renderer = AudioRenderer(track: processors.track, + binaryMessenger: binaryMessenger!, + rendererId: rendererId) + // Retain + processors.renderers[rendererId] = renderer - var isLocked: Bool = true - let unlock = { - guard isLocked else { - print("[LiveKit] not locked, ignoring unlock") - return - } - rtcSession.unlockForConfiguration() - isLocked = false + result(true) + } + + public func handleStopAudioRenderer(args: [String: Any?], result: @escaping FlutterResult) { + let rendererId = args[rendererIdKey] as? String + + guard let rendererId else { + result(FlutterError(code: rendererIdKey, message: "\(rendererIdKey) is required", details: nil)) + return } - // always `unlock()` when exiting scope, calling multiple times has no side-effect - defer { - unlock() + for processors in audioProcessors.values { + processors.renderers.removeValue(forKey: rendererId) } - do { - try rtcSession.setConfiguration(configuration, active: true) - // unlock here before configuring `AVAudioSession` - // unlock() - print("[LiveKit] RTCAudioSession Configure success") - - // also configure longFormAudio - // let avSession = AVAudioSession.sharedInstance() - // try avSession.setCategory(AVAudioSession.Category(rawValue: configuration.category), - // mode: AVAudioSession.Mode(rawValue: configuration.mode), - // policy: .default, - // options: configuration.categoryOptions) - // print("[LiveKit] AVAudioSession Configure success") - - // preferSpeakerOutput - if let preferSpeakerOutput = args["preferSpeakerOutput"] as? Bool { - try rtcSession.overrideOutputAudioPort(preferSpeakerOutput ? .speaker : .none) + result(true) + } + + public func handleConfigureNativeAudio(args: [String: Any?], result: @escaping FlutterResult) { + #if os(macOS) + result(FlutterMethodNotImplemented) + #else + + let configuration = RTCAudioSessionConfiguration.webRTC() + + // Category + if let string = args["appleAudioCategory"] as? String, + let category = categoryMap[string] + { + configuration.category = category.rawValue + print("[LiveKit] Configuring category: ", configuration.category) + } + + // CategoryOptions + if let strings = args["appleAudioCategoryOptions"] as? [String] { + configuration.categoryOptions = categoryOptions(fromFlutter: strings) + print("[LiveKit] Configuring categoryOptions: ", strings) + } + + // Mode + if let string = args["appleAudioMode"] as? String, + let mode = modeMap[string] + { + configuration.mode = mode.rawValue + print("[LiveKit] Configuring mode: ", configuration.mode) + } + + // get `RTCAudioSession` and lock + let rtcSession = RTCAudioSession.sharedInstance() + rtcSession.lockForConfiguration() + + var isLocked = true + let unlock = { + guard isLocked else { + print("[LiveKit] not locked, ignoring unlock") + return + } + rtcSession.unlockForConfiguration() + isLocked = false + } + + // always `unlock()` when exiting scope, calling multiple times has no side-effect + defer { + unlock() + } + + do { + try rtcSession.setConfiguration(configuration, active: true) + // unlock here before configuring `AVAudioSession` + // unlock() + print("[LiveKit] RTCAudioSession Configure success") + + // also configure longFormAudio + // let avSession = AVAudioSession.sharedInstance() + // try avSession.setCategory(AVAudioSession.Category(rawValue: configuration.category), + // mode: AVAudioSession.Mode(rawValue: configuration.mode), + // policy: .default, + // options: configuration.categoryOptions) + // print("[LiveKit] AVAudioSession Configure success") + + // preferSpeakerOutput + if let preferSpeakerOutput = args["preferSpeakerOutput"] as? Bool { + try rtcSession.overrideOutputAudioPort(preferSpeakerOutput ? .speaker : .none) + } + result(true) + } catch { + print("[LiveKit] Configure audio error: ", error) + result(FlutterError(code: "configure", message: error.localizedDescription, details: nil)) } - result(true) - } catch let error { - print("[LiveKit] Configure audio error: ", error) - result(FlutterError(code: "configure", message: error.localizedDescription, details: nil)) - } #endif } @@ -258,7 +337,7 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { if osVersion.patchVersion != 0 { versions.append(osVersion.patchVersion) } - return versions.map({ String($0) }).joined(separator: ".") + return versions.map { String($0) }.joined(separator: ".") } public func handle(_ call: FlutterMethodCall, result: @escaping FlutterResult) { @@ -278,12 +357,12 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { case "osVersionString": result(LiveKitPlugin.osVersionString()) #if os(iOS) - case "broadcastRequestActivation": - BroadcastManager.shared.requestActivation() - result(true) - case "broadcastRequestStop": - BroadcastManager.shared.requestStop() - result(true) + case "broadcastRequestActivation": + BroadcastManager.shared.requestActivation() + result(true) + case "broadcastRequestStop": + BroadcastManager.shared.requestStop() + result(true) #endif default: print("[LiveKit] method not found: ", call.method) From d5b84229348995eb61dfcde2d903312ace5c4df5 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 22 Jul 2025 20:51:59 +0900 Subject: [PATCH 04/37] Buffer --- shared_swift/AudioRenderer.swift | 76 +++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 2 deletions(-) diff --git a/shared_swift/AudioRenderer.swift b/shared_swift/AudioRenderer.swift index 71d8b479e..2d7b7ae41 100644 --- a/shared_swift/AudioRenderer.swift +++ b/shared_swift/AudioRenderer.swift @@ -62,7 +62,79 @@ extension AudioRenderer: FlutterStreamHandler { } extension AudioRenderer: RTCAudioRenderer { - public func render(pcmBuffer _: AVAudioPCMBuffer) { - eventSink?("audio_renderer_event") + public func render(pcmBuffer: AVAudioPCMBuffer) { + guard let eventSink = eventSink else { return } + + // Extract audio format information + let sampleRate = pcmBuffer.format.sampleRate + let channelCount = pcmBuffer.format.channelCount + let frameLength = pcmBuffer.frameLength + + // The format of the data: + // { + // "sampleRate": 48000.0, + // "channelCount": 2, + // "frameLength": 480, + // "format": "float32", // or "int16", "int32", "unknown" + // "data": [ + // [/* channel 0 audio samples */], + // [/* channel 1 audio samples */] + // ] + // } + + // Create the result dictionary to send to Flutter + var result: [String: Any] = [ + "sampleRate": sampleRate, + "channelCount": channelCount, + "frameLength": frameLength, + ] + + // Extract audio data based on the buffer format + if let floatChannelData = pcmBuffer.floatChannelData { + // Buffer contains float data + var channelsData: [[Float]] = [] + + for channel in 0 ..< Int(channelCount) { + let channelPointer = floatChannelData[channel] + let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: Int(frameLength))) + channelsData.append(channelArray) + } + + result["data"] = channelsData + result["format"] = "float32" + } else if let int16ChannelData = pcmBuffer.int16ChannelData { + // Buffer contains int16 data + var channelsData: [[Int16]] = [] + + for channel in 0 ..< Int(channelCount) { + let channelPointer = int16ChannelData[channel] + let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: Int(frameLength))) + channelsData.append(channelArray) + } + + result["data"] = channelsData + result["format"] = "int16" + } else if let int32ChannelData = pcmBuffer.int32ChannelData { + // Buffer contains int32 data + var channelsData: [[Int32]] = [] + + for channel in 0 ..< Int(channelCount) { + let channelPointer = int32ChannelData[channel] + let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: Int(frameLength))) + channelsData.append(channelArray) + } + + result["data"] = channelsData + result["format"] = "int32" + } else { + // Fallback - send minimal info if no recognizable data format + result["data"] = [] + result["format"] = "unknown" + } + + // Send the result to Flutter on the main thread + DispatchQueue.main.async { + eventSink(result) + } } } From 2440c3712f60c741b56c8afbf5b1e7a8b423322e Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 22 Jul 2025 21:03:39 +0900 Subject: [PATCH 05/37] start stop --- shared_swift/LiveKitPlugin.swift | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/shared_swift/LiveKitPlugin.swift b/shared_swift/LiveKitPlugin.swift index 9e6cac6fd..53fd115bb 100644 --- a/shared_swift/LiveKitPlugin.swift +++ b/shared_swift/LiveKitPlugin.swift @@ -354,6 +354,10 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { handleStartAudioVisualizer(args: args, result: result) case "stopVisualizer": handleStopAudioVisualizer(args: args, result: result) + case "startAudioRenderer": + handleStartAudioRenderer(args: args, result: result) + case "stopAudioRenderer": + handleStopAudioRenderer(args: args, result: result) case "osVersionString": result(LiveKitPlugin.osVersionString()) #if os(iOS) From ae642445cbfbf263597fbc421a0043b2e7c8aaa4 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 12 Aug 2025 17:38:57 +0900 Subject: [PATCH 06/37] pre-connect impl --- lib/livekit_client.dart | 2 + lib/src/core/room.dart | 4 + lib/src/core/room_preconnect.dart | 26 +++ lib/src/options.dart | 9 +- lib/src/participant/local.dart | 8 + .../preconnect/pre_connect_audio_buffer.dart | 205 ++++++++++++++++++ lib/src/support/native.dart | 36 +++ pubspec.lock | 8 +- shared_swift/AudioRenderer.swift | 19 +- shared_swift/LiveKitPlugin.swift | 11 +- 10 files changed, 313 insertions(+), 15 deletions(-) create mode 100644 lib/src/core/room_preconnect.dart create mode 100644 lib/src/preconnect/pre_connect_audio_buffer.dart diff --git a/lib/livekit_client.dart b/lib/livekit_client.dart index f3415b4de..bf0f1915a 100644 --- a/lib/livekit_client.dart +++ b/lib/livekit_client.dart @@ -17,6 +17,7 @@ library livekit_client; export 'src/constants.dart'; export 'src/core/room.dart'; +export 'src/core/room_preconnect.dart'; export 'src/data_stream/stream_reader.dart'; export 'src/data_stream/stream_writer.dart'; export 'src/e2ee/e2ee_manager.dart'; @@ -34,6 +35,7 @@ export 'src/options.dart'; export 'src/participant/local.dart'; export 'src/participant/participant.dart'; export 'src/participant/remote.dart'; +export 'src/preconnect/pre_connect_audio_buffer.dart'; export 'src/publication/local.dart'; export 'src/publication/remote.dart'; export 'src/publication/track_publication.dart'; diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 58027f909..f6e36927f 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -34,6 +34,7 @@ import '../options.dart'; import '../participant/local.dart'; import '../participant/participant.dart'; import '../participant/remote.dart'; +import '../preconnect/pre_connect_audio_buffer.dart'; import '../proto/livekit_models.pb.dart' as lk_models; import '../proto/livekit_rtc.pb.dart' as lk_rtc; import '../support/disposable.dart'; @@ -135,6 +136,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable { final Map _textStreamHandlers = {}; + @internal + late final preConnectAudioBuffer = PreConnectAudioBuffer(this); + // for testing @internal Map get rpcHandlers => _rpcHandlers; diff --git a/lib/src/core/room_preconnect.dart b/lib/src/core/room_preconnect.dart new file mode 100644 index 000000000..681df9db2 --- /dev/null +++ b/lib/src/core/room_preconnect.dart @@ -0,0 +1,26 @@ +import 'dart:async'; + +import '../logger.dart'; +import '../preconnect/pre_connect_audio_buffer.dart'; +import 'room.dart'; + +extension RoomPreConnect on Room { + /// Wrap an async operation while a pre-connect audio buffer records. + /// Stops and flushes on error. + Future withPreConnectAudio( + Future Function() operation, { + Duration timeout = const Duration(seconds: 10), + PreConnectOnError? onError, + }) async { + await preConnectAudioBuffer.startRecording(timeout: timeout); + try { + final result = await operation(); + return result; + } catch (error) { + logger.warning('[Preconnect] operation failed with error: $error'); + rethrow; + } finally { + await preConnectAudioBuffer.reset(); + } + } +} diff --git a/lib/src/options.dart b/lib/src/options.dart index 1f64401d1..d3873db47 100644 --- a/lib/src/options.dart +++ b/lib/src/options.dart @@ -326,12 +326,17 @@ class AudioPublishOptions extends PublishOptions { /// max audio bitrate final int audioBitrate; + /// Mark this audio as originating from a pre-connect buffer. + /// Used to populate protobuf audioFeatures (TF_PRECONNECT_BUFFER). + final bool preConnect; + const AudioPublishOptions({ super.name, super.stream, this.dtx = true, this.red = true, this.audioBitrate = AudioPreset.music, + this.preConnect = false, }); AudioPublishOptions copyWith({ @@ -340,6 +345,7 @@ class AudioPublishOptions extends PublishOptions { String? name, String? stream, bool? red, + bool? preConnect, }) => AudioPublishOptions( dtx: dtx ?? this.dtx, @@ -347,11 +353,12 @@ class AudioPublishOptions extends PublishOptions { name: name ?? this.name, stream: stream ?? this.stream, red: red ?? this.red, + preConnect: preConnect ?? this.preConnect, ); @override String toString() => - '${runtimeType}(dtx: ${dtx}, audioBitrate: ${audioBitrate}, red: ${red})'; + '${runtimeType}(dtx: ${dtx}, audioBitrate: ${audioBitrate}, red: ${red}, preConnect: ${preConnect})'; } final backupCodecs = ['vp8', 'h264']; diff --git a/lib/src/participant/local.dart b/lib/src/participant/local.dart index 823513819..25dd1f3b5 100644 --- a/lib/src/participant/local.dart +++ b/lib/src/participant/local.dart @@ -127,6 +127,14 @@ class LocalParticipant extends Participant { encryption: room.roomOptions.lkEncryptionType, ); + // Populate audio features (e.g., TF_NO_DTX, TF_PRECONNECT_BUFFER) + req.audioFeatures.addAll([ + if (!publishOptions.dtx) + lk_models.AudioTrackFeature.TF_NO_DTX, + if (publishOptions.preConnect) + lk_models.AudioTrackFeature.TF_PRECONNECT_BUFFER, + ]); + Future negotiate() async { track.transceiver = await room.engine .createTransceiverRTCRtpSender(track, publishOptions!, encodings); diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart new file mode 100644 index 000000000..3b3ba91b5 --- /dev/null +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -0,0 +1,205 @@ +// Copyright 2025 LiveKit, Inc. +// Lightweight pre-connect audio buffer (scaffold). Captures bytes externally +// and uploads via byte stream once an agent is ready. + +import 'dart:async'; +import 'dart:developer'; +import 'dart:typed_data'; + +import 'package:flutter/services.dart'; +import 'package:livekit_client/livekit_client.dart'; +import 'package:uuid/uuid.dart'; + +import '../support/native.dart'; + +typedef PreConnectOnError = void Function(Object error); + +class AudioFrame { + final List data; + final int sampleRate; + final int channelCount; + final int frameLength; + final String format; + + AudioFrame({ + required this.data, + required this.sampleRate, + required this.channelCount, + required this.frameLength, + required this.format, + }); + + factory AudioFrame.fromMap(Map map) => AudioFrame( + data: (map['data'] as List) + .map((channel) => (channel as List).map((e) => e as int).toList() as Int16List) + .toList(), + sampleRate: (map['sampleRate'] as int), + channelCount: (map['channelCount'] as int), + frameLength: (map['frameLength'] as int), + format: map['format'] as String, + ); +} + +class PreConnectAudioBuffer { + static const String dataTopic = 'lk.agent.pre-connect-audio-buffer'; + + static const int defaultMaxSize = 10 * 1024 * 1024; // 10MB + static const int defaultSampleRate = 24000; // Hz + + // Reference to the room + final Room _room; + + // Internal states + bool _isRecording = false; + bool _isSent = false; + String? _rendererId; + + LocalAudioTrack? _localTrack; + EventChannel? _eventChannel; + StreamSubscription? _streamSubscription; + + final PreConnectOnError? _onError; + final int _sampleRate; + + final BytesBuilder _bytes = BytesBuilder(copy: false); + Timer? _timeoutTimer; + CancelListenFunc? _participantStateListener; + CancelListenFunc? _remoteSubscribedListener; + + PreConnectAudioBuffer( + this._room, { + PreConnectOnError? onError, + int sampleRate = defaultSampleRate, + }) : _onError = onError, + _sampleRate = sampleRate; + + // Getters + bool get isRecording => _isRecording; + int get bufferedSize => _bytes.length; + + Future startRecording({ + Duration timeout = const Duration(seconds: 10), + }) async { + if (_isRecording) { + logger.warning('Already recording'); + return; + } + _isRecording = true; + + _localTrack = await LocalAudioTrack.create(); + print('localTrack: ${_localTrack!.mediaStreamTrack.id}'); + + final rendererId = Uuid().v4(); + logger.info('Starting audio renderer with rendererId: $rendererId'); + + final result = await Native.startAudioRenderer( + trackId: _localTrack!.mediaStreamTrack.id!, + rendererId: rendererId, + ); + + _rendererId = rendererId; + + logger.info('startAudioRenderer result: $result'); + + _eventChannel = EventChannel('io.livekit.audio.renderer/channel-$rendererId'); + _streamSubscription = _eventChannel?.receiveBroadcastStream().listen((event) { + try { + // logger.info('event: $event'); + // {sampleRate: 32000, format: int16, frameLength: 320, channelCount: 1} + final dataChannels = event['data'] as List; + final monoData = dataChannels[0].cast(); + _bytes.add(monoData); + log('bufferedSize: ${_bytes.length}'); + } catch (e) { + logger.warning('Error parsing event: $e'); + } + }); + + // Listen for agent readiness; when active, attempt to send buffer once. + _participantStateListener = _room.events.on((event) async { + if (event.participant.kind == ParticipantKind.AGENT && event.state == ParticipantState.active) { + logger.info('Agent is active: ${event.participant.identity}'); + try { + await sendAudioData(agents: [event.participant.identity]); + } catch (e) { + _onError?.call(e); + } finally { + await reset(); + } + } + }); + + _remoteSubscribedListener = _room.events.on((event) async { + logger.info('Remote track subscribed: ${event.trackSid}'); + await stopRecording(); + }); + } + + Future stopRecording() async { + if (!_isRecording) { + logger.warning('Not recording'); + return; + } + _isRecording = false; + + // Cancel the stream subscription. + await _streamSubscription?.cancel(); + _streamSubscription = null; + + // Dispose the event channel. + _eventChannel = null; + + final rendererId = _rendererId; + if (rendererId == null) { + logger.warning('No rendererId'); + return; + } + + await Native.stopAudioRenderer( + rendererId: rendererId, + ); + + _rendererId = null; + } + + // Clean-up & reset for re-use + Future reset() async { + await stopRecording(); + _timeoutTimer?.cancel(); + _participantStateListener?.call(); + _participantStateListener = null; + _remoteSubscribedListener?.call(); + _remoteSubscribedListener = null; + _bytes.clear(); + _localTrack = null; + } + + Future sendAudioData({ + required List agents, + String topic = dataTopic, + }) async { + if (_isSent) return; + if (agents.isEmpty) return; + + final data = _bytes.takeBytes(); + if (data.length <= 1024) { + throw StateError('Audio data too small to send (${data.length} bytes)'); + } + _isSent = true; + + final streamOptions = StreamBytesOptions( + topic: topic, + attributes: { + 'sampleRate': '$_sampleRate', + 'channels': '1', + 'trackId': _localTrack!.mediaStreamTrack.id!, + }, + destinationIdentities: agents, + ); + + final writer = await _room.localParticipant!.streamBytes(streamOptions); + await writer.write(data); + await writer.close(); + logger.info('[preconnect] sent ${(data.length / 1024).toStringAsFixed(1)}KB of audio to ${agents.length} agent(s)'); + } +} diff --git a/lib/src/support/native.dart b/lib/src/support/native.dart index c40400885..f7384e3cd 100644 --- a/lib/src/support/native.dart +++ b/lib/src/support/native.dart @@ -91,6 +91,42 @@ class Native { } } + @internal + static Future startAudioRenderer({ + required String trackId, + required String rendererId, + }) async { + try { + final result = await channel.invokeMethod( + 'startAudioRenderer', + { + 'trackId': trackId, + 'rendererId': rendererId, + }, + ); + return result == true; + } catch (error) { + logger.warning('startAudioRenderer did throw $error'); + return false; + } + } + + @internal + static Future stopAudioRenderer({ + required String rendererId, + }) async { + try { + await channel.invokeMethod( + 'stopAudioRenderer', + { + 'rendererId': rendererId, + }, + ); + } catch (error) { + logger.warning('stopAudioRenderer did throw $error'); + } + } + /// Returns OS's version as a string /// Currently only for iOS, macOS @internal diff --git a/pubspec.lock b/pubspec.lock index 858715d7c..4652acd5c 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -141,10 +141,10 @@ packages: dependency: "direct main" description: name: dart_webrtc - sha256: "5b76fd85ac95d6f5dee3e7d7de8d4b51bfbec1dc73804647c6aebb52d6297116" + sha256: a2ae542cdadc21359022adedc26138fa3487cc3b3547c24ff4f556681869e28c url: "https://pub.dev" source: hosted - version: "1.5.3+hotfix.2" + version: "1.5.3+hotfix.4" dbus: dependency: transitive description: @@ -220,10 +220,10 @@ packages: dependency: "direct main" description: name: flutter_webrtc - sha256: dd47ca103b5b6217771e6277882674276d9621bbf9eb23da3c03898b507844e3 + sha256: "69095ba39b83da3de48286dfc0769aa8e9f10491f70058dc8d8ecc960ef7a260" url: "https://pub.dev" source: hosted - version: "0.14.1" + version: "1.0.0" glob: dependency: transitive description: diff --git a/shared_swift/AudioRenderer.swift b/shared_swift/AudioRenderer.swift index 2d7b7ae41..d550542eb 100644 --- a/shared_swift/AudioRenderer.swift +++ b/shared_swift/AudioRenderer.swift @@ -30,22 +30,27 @@ public class AudioRenderer: NSObject { private var channel: FlutterEventChannel? private weak var _track: AudioTrack? - - public init(track: AudioTrack?, + public let rendererId: String + public init(track: AudioTrack, binaryMessenger: FlutterBinaryMessenger, rendererId: String) { _track = track + self.rendererId = rendererId super.init() _track?.add(audioRenderer: self) - let channelName = "io.livekit.audio.renderer/eventchannel-" + rendererId + let channelName = "io.livekit.audio.renderer/channel-" + rendererId channel = FlutterEventChannel(name: channelName, binaryMessenger: binaryMessenger) channel?.setStreamHandler(self) } + func detach() { + _track?.remove(audioRenderer: self) + } + deinit { - _track?.remove(audioRenderer: self) + detach() } } @@ -84,9 +89,9 @@ extension AudioRenderer: RTCAudioRenderer { // Create the result dictionary to send to Flutter var result: [String: Any] = [ - "sampleRate": sampleRate, - "channelCount": channelCount, - "frameLength": frameLength, + "sampleRate": UInt(sampleRate), + "channelCount": UInt(channelCount), + "frameLength": UInt(frameLength), ] // Extract audio data based on the buffer format diff --git a/shared_swift/LiveKitPlugin.swift b/shared_swift/LiveKitPlugin.swift index 53fd115bb..d2719a7fe 100644 --- a/shared_swift/LiveKitPlugin.swift +++ b/shared_swift/LiveKitPlugin.swift @@ -24,7 +24,7 @@ import WebRTC import UIKit #endif -let trackIdKey = "visualizerId" +let trackIdKey = "trackId" let visualizerIdKey = "visualizerId" let rendererIdKey = "rendererId" @@ -221,7 +221,7 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { } // Already exists - if processors.visualizers[rendererId] != nil { + if processors.renderers[rendererId] != nil { result(true) return } @@ -232,6 +232,8 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { // Retain processors.renderers[rendererId] = renderer + AudioManager.sharedInstance().startLocalRecording() + result(true) } @@ -244,7 +246,10 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { } for processors in audioProcessors.values { - processors.renderers.removeValue(forKey: rendererId) + if let renderer = processors.renderers[rendererId] { + renderer.detach() + processors.renderers.removeValue(forKey: rendererId) + } } result(true) From 8236f39ba030a186c1f5ee159e447eba817e69f6 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Sun, 17 Aug 2025 19:53:03 +0900 Subject: [PATCH 07/37] Improve completer --- lib/src/core/room_preconnect.dart | 1 + .../preconnect/pre_connect_audio_buffer.dart | 56 +++++++--- lib/src/support/completer_manager.dart | 101 ++++++++++++++++++ 3 files changed, 146 insertions(+), 12 deletions(-) create mode 100644 lib/src/support/completer_manager.dart diff --git a/lib/src/core/room_preconnect.dart b/lib/src/core/room_preconnect.dart index 681df9db2..023cd0488 100644 --- a/lib/src/core/room_preconnect.dart +++ b/lib/src/core/room_preconnect.dart @@ -15,6 +15,7 @@ extension RoomPreConnect on Room { await preConnectAudioBuffer.startRecording(timeout: timeout); try { final result = await operation(); + await preConnectAudioBuffer.agentReadyFuture; return result; } catch (error) { logger.warning('[Preconnect] operation failed with error: $error'); diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index 3b3ba91b5..fa30260d0 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -3,13 +3,13 @@ // and uploads via byte stream once an agent is ready. import 'dart:async'; -import 'dart:developer'; import 'dart:typed_data'; import 'package:flutter/services.dart'; import 'package:livekit_client/livekit_client.dart'; import 'package:uuid/uuid.dart'; +import '../support/completer_manager.dart'; import '../support/native.dart'; typedef PreConnectOnError = void Function(Object error); @@ -59,13 +59,15 @@ class PreConnectAudioBuffer { StreamSubscription? _streamSubscription; final PreConnectOnError? _onError; - final int _sampleRate; + int? _sampleRate; final BytesBuilder _bytes = BytesBuilder(copy: false); Timer? _timeoutTimer; CancelListenFunc? _participantStateListener; CancelListenFunc? _remoteSubscribedListener; + final CompleterManager _agentReadyManager = CompleterManager(); + PreConnectAudioBuffer( this._room, { PreConnectOnError? onError, @@ -77,6 +79,9 @@ class PreConnectAudioBuffer { bool get isRecording => _isRecording; int get bufferedSize => _bytes.length; + /// Future that completes when an agent is ready. + Future get agentReadyFuture => _agentReadyManager.future; + Future startRecording({ Duration timeout = const Duration(seconds: 10), }) async { @@ -86,6 +91,9 @@ class PreConnectAudioBuffer { } _isRecording = true; + // Set up timeout for agent readiness + _agentReadyManager.setTimer(timeout, timeoutReason: 'Agent did not become ready within timeout'); + _localTrack = await LocalAudioTrack.create(); print('localTrack: ${_localTrack!.mediaStreamTrack.id}'); @@ -104,12 +112,13 @@ class PreConnectAudioBuffer { _eventChannel = EventChannel('io.livekit.audio.renderer/channel-$rendererId'); _streamSubscription = _eventChannel?.receiveBroadcastStream().listen((event) { try { - // logger.info('event: $event'); + // logger.info('sampleRate: ${event['sampleRate']}'); // {sampleRate: 32000, format: int16, frameLength: 320, channelCount: 1} + _sampleRate = event['sampleRate'] as int; final dataChannels = event['data'] as List; final monoData = dataChannels[0].cast(); _bytes.add(monoData); - log('bufferedSize: ${_bytes.length}'); + logger.info('[Preconnect audio] bufferedSize: ${_bytes.length}'); } catch (e) { logger.warning('Error parsing event: $e'); } @@ -117,20 +126,24 @@ class PreConnectAudioBuffer { // Listen for agent readiness; when active, attempt to send buffer once. _participantStateListener = _room.events.on((event) async { + // logger.info('[Preconnect audio] State event ${event}'); + if (event.participant.kind == ParticipantKind.AGENT && event.state == ParticipantState.active) { - logger.info('Agent is active: ${event.participant.identity}'); + logger.info('[Preconnect audio] Agent is active: ${event.participant.identity}'); try { await sendAudioData(agents: [event.participant.identity]); + _agentReadyManager.complete(); } catch (e) { + _agentReadyManager.completeError(e); _onError?.call(e); } finally { - await reset(); + // await reset(); } } }); _remoteSubscribedListener = _room.events.on((event) async { - logger.info('Remote track subscribed: ${event.trackSid}'); + logger.info('[Preconnect audio] Remote track subscribed: ${event.trackSid}'); await stopRecording(); }); } @@ -160,6 +173,11 @@ class PreConnectAudioBuffer { ); _rendererId = null; + + // Complete agent ready future if not already completed + _agentReadyManager.complete(); + + logger.info('[Preconnect audio] stopped recording'); } // Clean-up & reset for re-use @@ -172,6 +190,17 @@ class PreConnectAudioBuffer { _remoteSubscribedListener = null; _bytes.clear(); _localTrack = null; + _agentReadyManager.reset(); + + logger.info('[Preconnect audio] reset'); + } + + /// Dispose the audio buffer and clean up all resources. + void dispose() { + _agentReadyManager.dispose(); + _timeoutTimer?.cancel(); + _participantStateListener?.call(); + _remoteSubscribedListener?.call(); } Future sendAudioData({ @@ -181,16 +210,19 @@ class PreConnectAudioBuffer { if (_isSent) return; if (agents.isEmpty) return; - final data = _bytes.takeBytes(); - if (data.length <= 1024) { - throw StateError('Audio data too small to send (${data.length} bytes)'); + logger.info('[Preconnect audio] sending audio data to ${agents.map((e) => e).join(', ')} agent(s)'); + + if (_sampleRate == null) { + throw StateError('[Preconnect audio] Sample rate is not set'); } + + final data = _bytes.takeBytes(); _isSent = true; final streamOptions = StreamBytesOptions( topic: topic, attributes: { - 'sampleRate': '$_sampleRate', + 'sampleRate': _sampleRate.toString(), 'channels': '1', 'trackId': _localTrack!.mediaStreamTrack.id!, }, @@ -200,6 +232,6 @@ class PreConnectAudioBuffer { final writer = await _room.localParticipant!.streamBytes(streamOptions); await writer.write(data); await writer.close(); - logger.info('[preconnect] sent ${(data.length / 1024).toStringAsFixed(1)}KB of audio to ${agents.length} agent(s)'); + logger.info('[Preconnect audio] sent ${(data.length / 1024).toStringAsFixed(1)}KB of audio to ${agents} agent(s)'); } } diff --git a/lib/src/support/completer_manager.dart b/lib/src/support/completer_manager.dart new file mode 100644 index 000000000..5b3b39b6c --- /dev/null +++ b/lib/src/support/completer_manager.dart @@ -0,0 +1,101 @@ +// Copyright 2025 LiveKit, Inc. +// A reusable completer manager that handles safe completion and lifecycle management. + +import 'dart:async'; + +/// A manager for Completer instances that provides safe completion and automatic lifecycle management. +/// +/// Features: +/// - Safe completion (prevents double completion exceptions) +/// - Automatic timeout handling +/// - Clean state management and reusability +/// - Only exposes Future, not the Completer itself +/// - Thread-safe operations +class CompleterManager { + Completer? _completer; + Timer? _timeoutTimer; + bool _isCompleted = false; + + /// Gets the current future. Creates a new completer if none exists or previous one was completed. + Future get future { + if (_completer == null || _isCompleted) { + _reset(); + } + return _completer!.future; + } + + /// Whether the current completer is completed. + bool get isCompleted => _isCompleted; + + /// Whether there's an active completer waiting for completion. + bool get isActive => _completer != null && !_isCompleted; + + /// Completes the current completer with the given value. + /// Returns true if successfully completed, false if already completed. + bool complete([FutureOr? value]) { + if (_completer == null || _isCompleted) { + return false; + } + + _isCompleted = true; + _timeoutTimer?.cancel(); + _timeoutTimer = null; + + _completer!.complete(value); + return true; + } + + /// Completes the current completer with an error. + /// Returns true if successfully completed with error, false if already completed. + bool completeError(Object error, [StackTrace? stackTrace]) { + if (_completer == null || _isCompleted) { + return false; + } + + _isCompleted = true; + _timeoutTimer?.cancel(); + _timeoutTimer = null; + + _completer!.completeError(error, stackTrace); + return true; + } + + /// Sets up a timeout for the current completer. + /// If the completer is not completed within the timeout, it will be completed with a TimeoutException. + void setTimer(Duration timeout, {String? timeoutReason}) { + if (_completer == null || _isCompleted) { + return; + } + + _timeoutTimer?.cancel(); + _timeoutTimer = Timer(timeout, () { + if (!_isCompleted) { + final reason = timeoutReason ?? 'Operation timed out after $timeout'; + completeError(TimeoutException(reason, timeout)); + } + }); + } + + /// Resets the manager, canceling any pending operations and preparing for reuse. + void reset() { + _reset(); + } + + void _reset() { + _timeoutTimer?.cancel(); + _timeoutTimer = null; + _isCompleted = false; + _completer = Completer(); + } + + /// Disposes the manager, canceling any pending operations. + void dispose() { + _timeoutTimer?.cancel(); + _timeoutTimer = null; + if (_completer != null && !_isCompleted) { + _completer!.completeError(StateError('CompleterManager disposed')); + } + _completer = null; + _isCompleted = true; + } +} From 189a2d79e918cb2e8c96098563e2cfbb5bf6651d Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 26 Aug 2025 19:42:08 +0900 Subject: [PATCH 08/37] Fix bytes to read --- shared_swift/AudioRenderer.swift | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/shared_swift/AudioRenderer.swift b/shared_swift/AudioRenderer.swift index d550542eb..fab32824b 100644 --- a/shared_swift/AudioRenderer.swift +++ b/shared_swift/AudioRenderer.swift @@ -101,7 +101,8 @@ extension AudioRenderer: RTCAudioRenderer { for channel in 0 ..< Int(channelCount) { let channelPointer = floatChannelData[channel] - let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: Int(frameLength))) + let bytesToRead = Int(frameLength) * MemoryLayout.size + let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: bytesToRead)) channelsData.append(channelArray) } @@ -113,7 +114,8 @@ extension AudioRenderer: RTCAudioRenderer { for channel in 0 ..< Int(channelCount) { let channelPointer = int16ChannelData[channel] - let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: Int(frameLength))) + let bytesToRead = Int(frameLength) * MemoryLayout.size + let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: bytesToRead)) channelsData.append(channelArray) } @@ -125,7 +127,8 @@ extension AudioRenderer: RTCAudioRenderer { for channel in 0 ..< Int(channelCount) { let channelPointer = int32ChannelData[channel] - let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: Int(frameLength))) + let bytesToRead = Int(frameLength) * MemoryLayout.size + let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: bytesToRead)) channelsData.append(channelArray) } From 9ce8c0d881fb101f60c97e13e2df646ab211a5d2 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Sun, 17 Aug 2025 19:46:42 +0900 Subject: [PATCH 09/37] Logging --- lib/src/events.dart | 2 +- .../preconnect/pre_connect_audio_buffer.dart | 17 +++++++++-- lib/src/types/data_stream.dart | 28 ++++++++++++++++--- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/lib/src/events.dart b/lib/src/events.dart index 31cbbaf09..43fdf1da1 100644 --- a/lib/src/events.dart +++ b/lib/src/events.dart @@ -382,7 +382,7 @@ class ParticipantStateUpdatedEvent with RoomEvent, ParticipantEvent { }); @override - String toString() => '${runtimeType}(participant: ${participant})'; + String toString() => '${runtimeType}(participant: ${participant}, state: ${state})'; } /// [Pariticpant]'s [ConnectionQuality] has updated. diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index fa30260d0..12b6999ea 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -112,13 +112,13 @@ class PreConnectAudioBuffer { _eventChannel = EventChannel('io.livekit.audio.renderer/channel-$rendererId'); _streamSubscription = _eventChannel?.receiveBroadcastStream().listen((event) { try { - // logger.info('sampleRate: ${event['sampleRate']}'); + // logger.info('[Preconnect audio] event: ${event}'); // {sampleRate: 32000, format: int16, frameLength: 320, channelCount: 1} _sampleRate = event['sampleRate'] as int; final dataChannels = event['data'] as List; final monoData = dataChannels[0].cast(); _bytes.add(monoData); - logger.info('[Preconnect audio] bufferedSize: ${_bytes.length}'); + logger.info('[Preconnect audio] monoData ${monoData.length}, bufferedSize: ${_bytes.length}'); } catch (e) { logger.warning('Error parsing event: $e'); } @@ -217,6 +217,8 @@ class PreConnectAudioBuffer { } final data = _bytes.takeBytes(); + logger.info('[Preconnect audio] data.length: ${data.length}, bytes.length: ${_bytes.length}'); + _isSent = true; final streamOptions = StreamBytesOptions( @@ -229,9 +231,18 @@ class PreConnectAudioBuffer { destinationIdentities: agents, ); + logger.info('[Preconnect audio] streamOptions: $streamOptions'); + final writer = await _room.localParticipant!.streamBytes(streamOptions); await writer.write(data); await writer.close(); - logger.info('[Preconnect audio] sent ${(data.length / 1024).toStringAsFixed(1)}KB of audio to ${agents} agent(s)'); + + // Compute seconds of audio data sent + final int bytesPerSample = 2; // Assuming 16-bit audio + final int totalSamples = data.length ~/ bytesPerSample; + final double secondsOfAudio = totalSamples / _sampleRate!; + + logger.info( + '[Preconnect audio] sent ${(data.length / 1024).toStringAsFixed(1)}KB of audio (${secondsOfAudio.toStringAsFixed(2)} seconds) to ${agents} agent(s)'); } } diff --git a/lib/src/types/data_stream.dart b/lib/src/types/data_stream.dart index d274f1f39..6823d274b 100644 --- a/lib/src/types/data_stream.dart +++ b/lib/src/types/data_stream.dart @@ -61,6 +61,12 @@ class StreamTextOptions { this.totalSize, this.type, }); + + @override + String toString() => '${runtimeType}' + '(topic: $topic, destinationIdentities: $destinationIdentities, ' + 'streamId: $streamId, totalSize: $totalSize, type: $type, version: $version, ' + 'replyToStreamId: $replyToStreamId, attachedStreamIds: $attachedStreamIds)'; } class StreamBytesOptions { @@ -72,6 +78,7 @@ class StreamBytesOptions { String? streamId; int? totalSize; Encryption_Type? encryptionType; + StreamBytesOptions({ this.name, this.mimeType, @@ -82,6 +89,11 @@ class StreamBytesOptions { this.totalSize, this.encryptionType = Encryption_Type.NONE, }); + + @override + String toString() => '${runtimeType}' + '(name: $name, mimeType: $mimeType, topic: $topic, destinationIdentities: $destinationIdentities, ' + 'attributes: $attributes, streamId: $streamId, totalSize: $totalSize, encryptionType: $encryptionType)'; } class ChatMessage { @@ -151,6 +163,11 @@ class ByteStreamInfo extends BaseStreamInfo { size: size, attributes: attributes, ); + + @override + String toString() => '${runtimeType}' + '(name: $name, id: $id, mimeType: $mimeType, topic: $topic, ' + 'timestamp: $timestamp, size: $size, attributes: $attributes)'; } class TextStreamInfo extends BaseStreamInfo { @@ -169,6 +186,11 @@ class TextStreamInfo extends BaseStreamInfo { size: size, attributes: attributes, ); + + @override + String toString() => '${runtimeType}' + '(id: $id, mimeType: $mimeType, topic: $topic, ' + 'timestamp: $timestamp, size: $size, attributes: $attributes)'; } abstract class StreamWriter { @@ -177,8 +199,6 @@ abstract class StreamWriter { Future write(T chunk); } -typedef ByteStreamHandler = void Function( - ByteStreamReader reader, String participantIdentity); +typedef ByteStreamHandler = void Function(ByteStreamReader reader, String participantIdentity); -typedef TextStreamHandler = Function( - TextStreamReader reader, String participantIdentity); +typedef TextStreamHandler = Function(TextStreamReader reader, String participantIdentity); From 8ffc39817723f10f4cb577d1ec782e1355334a9f Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 26 Aug 2025 19:59:25 +0900 Subject: [PATCH 10/37] Patch --- lib/src/core/engine.dart | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index 25b371af8..b5cdd2946 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -305,7 +305,11 @@ class Engine extends Disposable with EventsEmittable { if (isBufferStatusLow(kind) == true) { completer.complete(); } else { - onClosing() => completer.completeError('Engine disconnected'); + onClosing() { + if (!completer.isCompleted) { + completer.completeError('Engine disconnected'); + } + } events.once((e) => onClosing()); while (!_dcBufferStatus[kind]!) { From 642366401b8f5d18f85315ac280b0969c727f67c Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 26 Aug 2025 22:28:17 +0900 Subject: [PATCH 11/37] audio converter --- ios/Classes/AudioConverter.swift | 1 + .../preconnect/pre_connect_audio_buffer.dart | 12 ++-- lib/src/support/native.dart | 2 + macos/Classes/AudioConverter.swift | 1 + shared_swift/AVAudioPCMBuffer.swift | 22 ++++++ shared_swift/AudioConverter.swift | 68 +++++++++++++++++++ shared_swift/AudioRenderer.swift | 67 +++++++++++------- shared_swift/LiveKitPlugin.swift | 42 +++++++++++- 8 files changed, 186 insertions(+), 29 deletions(-) create mode 120000 ios/Classes/AudioConverter.swift create mode 120000 macos/Classes/AudioConverter.swift create mode 100644 shared_swift/AudioConverter.swift diff --git a/ios/Classes/AudioConverter.swift b/ios/Classes/AudioConverter.swift new file mode 120000 index 000000000..cc657a8ec --- /dev/null +++ b/ios/Classes/AudioConverter.swift @@ -0,0 +1 @@ +../../shared_swift/AudioConverter.swift \ No newline at end of file diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index 12b6999ea..dee68ce23 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -59,7 +59,7 @@ class PreConnectAudioBuffer { StreamSubscription? _streamSubscription; final PreConnectOnError? _onError; - int? _sampleRate; + final int _sampleRate; final BytesBuilder _bytes = BytesBuilder(copy: false); Timer? _timeoutTimer; @@ -103,6 +103,11 @@ class PreConnectAudioBuffer { final result = await Native.startAudioRenderer( trackId: _localTrack!.mediaStreamTrack.id!, rendererId: rendererId, + format: { + 'commonFormat': 'int16', + 'sampleRate': _sampleRate, + 'channels': 1, + }, ); _rendererId = rendererId; @@ -112,9 +117,8 @@ class PreConnectAudioBuffer { _eventChannel = EventChannel('io.livekit.audio.renderer/channel-$rendererId'); _streamSubscription = _eventChannel?.receiveBroadcastStream().listen((event) { try { - // logger.info('[Preconnect audio] event: ${event}'); - // {sampleRate: 32000, format: int16, frameLength: 320, channelCount: 1} - _sampleRate = event['sampleRate'] as int; + logger.info('[Preconnect audio] event: ${event}'); + final dataChannels = event['data'] as List; final monoData = dataChannels[0].cast(); _bytes.add(monoData); diff --git a/lib/src/support/native.dart b/lib/src/support/native.dart index f7384e3cd..506183b28 100644 --- a/lib/src/support/native.dart +++ b/lib/src/support/native.dart @@ -95,6 +95,7 @@ class Native { static Future startAudioRenderer({ required String trackId, required String rendererId, + required Map format, }) async { try { final result = await channel.invokeMethod( @@ -102,6 +103,7 @@ class Native { { 'trackId': trackId, 'rendererId': rendererId, + 'format': format, }, ); return result == true; diff --git a/macos/Classes/AudioConverter.swift b/macos/Classes/AudioConverter.swift new file mode 120000 index 000000000..cc657a8ec --- /dev/null +++ b/macos/Classes/AudioConverter.swift @@ -0,0 +1 @@ +../../shared_swift/AudioConverter.swift \ No newline at end of file diff --git a/shared_swift/AVAudioPCMBuffer.swift b/shared_swift/AVAudioPCMBuffer.swift index ce4ccab49..9694cae25 100644 --- a/shared_swift/AVAudioPCMBuffer.swift +++ b/shared_swift/AVAudioPCMBuffer.swift @@ -18,6 +18,28 @@ import Accelerate import AVFoundation public extension AVAudioPCMBuffer { + /// Copies a range of an AVAudioPCMBuffer. + func copySegment(from startFrame: AVAudioFramePosition, to endFrame: AVAudioFramePosition) -> AVAudioPCMBuffer { + let framesToCopy = AVAudioFrameCount(endFrame - startFrame) + let segment = AVAudioPCMBuffer(pcmFormat: format, frameCapacity: framesToCopy)! + + let sampleSize = format.streamDescription.pointee.mBytesPerFrame + + let srcPtr = UnsafeMutableAudioBufferListPointer(mutableAudioBufferList) + let dstPtr = UnsafeMutableAudioBufferListPointer(segment.mutableAudioBufferList) + for (src, dst) in zip(srcPtr, dstPtr) { + memcpy(dst.mData, src.mData?.advanced(by: Int(startFrame) * Int(sampleSize)), Int(framesToCopy) * Int(sampleSize)) + } + + segment.frameLength = framesToCopy + return segment + } + + /// Copies a full segment from 0 to frameLength. frameCapacity will be equal to frameLength. + func copySegment() -> AVAudioPCMBuffer { + copySegment(from: 0, to: AVAudioFramePosition(frameLength)) + } + func resample(toSampleRate targetSampleRate: Double) -> AVAudioPCMBuffer? { let sourceFormat = format diff --git a/shared_swift/AudioConverter.swift b/shared_swift/AudioConverter.swift new file mode 100644 index 000000000..89bef58ce --- /dev/null +++ b/shared_swift/AudioConverter.swift @@ -0,0 +1,68 @@ +/* + * Copyright 2024 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import AVFAudio + +final class AudioConverter: Sendable { + let inputFormat: AVAudioFormat + let outputFormat: AVAudioFormat + + private let converter: AVAudioConverter + private let outputBuffer: AVAudioPCMBuffer + + /// Computes required frame capacity for output buffer. + static func frameCapacity(from inputFormat: AVAudioFormat, to outputFormat: AVAudioFormat, inputFrameCount: AVAudioFrameCount) -> AVAudioFrameCount { + let inputSampleRate = inputFormat.sampleRate + let outputSampleRate = outputFormat.sampleRate + // Compute the output frame capacity based on sample rate ratio + return AVAudioFrameCount(Double(inputFrameCount) * (outputSampleRate / inputSampleRate)) + } + + init?(from inputFormat: AVAudioFormat, to outputFormat: AVAudioFormat, outputBufferCapacity: AVAudioFrameCount = 9600) { + guard let converter = AVAudioConverter(from: inputFormat, to: outputFormat), + let buffer = AVAudioPCMBuffer(pcmFormat: outputFormat, frameCapacity: outputBufferCapacity) + else { + return nil + } + + outputBuffer = buffer + self.converter = converter + self.inputFormat = inputFormat + self.outputFormat = outputFormat + } + + func convert(from inputBuffer: AVAudioPCMBuffer) -> AVAudioPCMBuffer { + var error: NSError? + #if swift(>=6.0) + // Won't be accessed concurrently, marking as nonisolated(unsafe) to avoid Atomics. + nonisolated(unsafe) var bufferFilled = false + #else + var bufferFilled = false + #endif + + converter.convert(to: outputBuffer, error: &error) { _, outStatus in + if bufferFilled { + outStatus.pointee = .noDataNow + return nil + } + outStatus.pointee = .haveData + bufferFilled = true + return inputBuffer + } + + return outputBuffer.copySegment() + } +} diff --git a/shared_swift/AudioRenderer.swift b/shared_swift/AudioRenderer.swift index fab32824b..2ee5e7d20 100644 --- a/shared_swift/AudioRenderer.swift +++ b/shared_swift/AudioRenderer.swift @@ -26,17 +26,27 @@ import WebRTC #endif public class AudioRenderer: NSObject { + + public let rendererId: String + public let format: AVAudioFormat // Target format + private var eventSink: FlutterEventSink? private var channel: FlutterEventChannel? + private var converter: AudioConverter? + + // Weak ref private weak var _track: AudioTrack? - public let rendererId: String + public init(track: AudioTrack, binaryMessenger: FlutterBinaryMessenger, - rendererId: String) + rendererId: String, + format: AVAudioFormat) { _track = track self.rendererId = rendererId + self.format = format + super.init() _track?.add(audioRenderer: self) @@ -66,15 +76,8 @@ extension AudioRenderer: FlutterStreamHandler { } } -extension AudioRenderer: RTCAudioRenderer { - public func render(pcmBuffer: AVAudioPCMBuffer) { - guard let eventSink = eventSink else { return } - - // Extract audio format information - let sampleRate = pcmBuffer.format.sampleRate - let channelCount = pcmBuffer.format.channelCount - let frameLength = pcmBuffer.frameLength - +extension AVAudioPCMBuffer { + public func serialize() -> [String: Any] { // The format of the data: // { // "sampleRate": 48000.0, @@ -89,17 +92,17 @@ extension AudioRenderer: RTCAudioRenderer { // Create the result dictionary to send to Flutter var result: [String: Any] = [ - "sampleRate": UInt(sampleRate), - "channelCount": UInt(channelCount), + "sampleRate": UInt(format.sampleRate), + "channels": UInt(format.channelCount), "frameLength": UInt(frameLength), ] // Extract audio data based on the buffer format - if let floatChannelData = pcmBuffer.floatChannelData { + if let floatChannelData = floatChannelData { // Buffer contains float data var channelsData: [[Float]] = [] - for channel in 0 ..< Int(channelCount) { + for channel in 0 ..< Int(format.channelCount) { let channelPointer = floatChannelData[channel] let bytesToRead = Int(frameLength) * MemoryLayout.size let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: bytesToRead)) @@ -107,12 +110,12 @@ extension AudioRenderer: RTCAudioRenderer { } result["data"] = channelsData - result["format"] = "float32" - } else if let int16ChannelData = pcmBuffer.int16ChannelData { + result["commonFormat"] = "float32" + } else if let int16ChannelData = int16ChannelData { // Buffer contains int16 data var channelsData: [[Int16]] = [] - for channel in 0 ..< Int(channelCount) { + for channel in 0 ..< Int(format.channelCount) { let channelPointer = int16ChannelData[channel] let bytesToRead = Int(frameLength) * MemoryLayout.size let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: bytesToRead)) @@ -120,12 +123,12 @@ extension AudioRenderer: RTCAudioRenderer { } result["data"] = channelsData - result["format"] = "int16" - } else if let int32ChannelData = pcmBuffer.int32ChannelData { + result["commonFormat"] = "int16" + } else if let int32ChannelData = int32ChannelData { // Buffer contains int32 data var channelsData: [[Int32]] = [] - for channel in 0 ..< Int(channelCount) { + for channel in 0 ..< Int(format.channelCount) { let channelPointer = int32ChannelData[channel] let bytesToRead = Int(frameLength) * MemoryLayout.size let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: bytesToRead)) @@ -133,16 +136,32 @@ extension AudioRenderer: RTCAudioRenderer { } result["data"] = channelsData - result["format"] = "int32" + result["commonFormat"] = "int32" } else { // Fallback - send minimal info if no recognizable data format result["data"] = [] - result["format"] = "unknown" + result["commonFormat"] = "unknown" + } + + return result + } +} + +extension AudioRenderer: RTCAudioRenderer { + public func render(pcmBuffer: AVAudioPCMBuffer) { + guard let eventSink = eventSink else { return } + + // Create or update converter if needed + if converter == nil || pcmBuffer.format != converter!.inputFormat || format != converter!.outputFormat { + converter = AudioConverter(from: pcmBuffer.format, to: format) } + + let convertedBuffer = converter!.convert(from: pcmBuffer) + let serializedBuffer = convertedBuffer.serialize() // Send the result to Flutter on the main thread DispatchQueue.main.async { - eventSink(result) + eventSink(serializedBuffer) } } } diff --git a/shared_swift/LiveKitPlugin.swift b/shared_swift/LiveKitPlugin.swift index d2719a7fe..aa2905431 100644 --- a/shared_swift/LiveKitPlugin.swift +++ b/shared_swift/LiveKitPlugin.swift @@ -27,6 +27,11 @@ import WebRTC let trackIdKey = "trackId" let visualizerIdKey = "visualizerId" let rendererIdKey = "rendererId" +let formatKey = "format" + +let commonFormatKey = "commonFormat" +let sampleRateKey = "sampleRate" +let channelsKey = "channels" class AudioProcessors { var track: AudioTrack @@ -200,11 +205,45 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { result(true) } + public func parseAudioFormat(args: [String: Any?]) -> AVAudioFormat? { + guard let commonFormatString = args[commonFormatKey] as? String, + let sampleRate = args[sampleRateKey] as? Double, + let channels = args[channelsKey] as? AVAudioChannelCount else { + return nil + } + + let commonFormat: AVAudioCommonFormat + switch commonFormatString { + case "float32": + commonFormat = .pcmFormatFloat32 + case "int16": + commonFormat = .pcmFormatInt16 + case "int32": + commonFormat = .pcmFormatInt32 + default: + return nil + } + + return AVAudioFormat(commonFormat: commonFormat, sampleRate: sampleRate, channels: channels, interleaved: false) + } + public func handleStartAudioRenderer(args: [String: Any?], result: @escaping FlutterResult) { // Required params let trackId = args[trackIdKey] as? String let rendererId = args[rendererIdKey] as? String + let formatMap = args[formatKey] as? [String: Any?] + + guard let formatMap else { + result(FlutterError(code: formatKey, message: "\(formatKey) is required", details: nil)) + return + } + + guard let format = parseAudioFormat(args: formatMap) else { + result(FlutterError(code: formatKey, message: "Failed to parse format", details: nil)) + return + } + guard let trackId else { result(FlutterError(code: trackIdKey, message: "\(trackIdKey) is required", details: nil)) return @@ -228,7 +267,8 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { let renderer = AudioRenderer(track: processors.track, binaryMessenger: binaryMessenger!, - rendererId: rendererId) + rendererId: rendererId, + format: format) // Retain processors.renderers[rendererId] = renderer From f2ca756d15dcd12bc8b06ec6b84a5a8e9521b32c Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Mon, 1 Sep 2025 21:14:38 +0800 Subject: [PATCH 12/37] Update pubspec.lock --- pubspec.lock | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pubspec.lock b/pubspec.lock index 9991059dc..f5c0b19f6 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -268,10 +268,10 @@ packages: dependency: "direct main" description: name: flutter_webrtc - sha256: "69095ba39b83da3de48286dfc0769aa8e9f10491f70058dc8d8ecc960ef7a260" + sha256: "945d0a38b90fbca8257eadb167d8fb9fa7075d9a1939fd2953c10054454d1de2" url: "https://pub.dev" source: hosted - version: "1.0.0" + version: "1.1.0" frontend_server_client: dependency: transitive description: @@ -384,6 +384,14 @@ packages: url: "https://pub.dev" source: hosted version: "4.0.0" + logger: + dependency: transitive + description: + name: logger + sha256: "55d6c23a6c15db14920e037fe7e0dc32e7cdaf3b64b4b25df2d541b5b6b81c0c" + url: "https://pub.dev" + source: hosted + version: "2.6.1" logging: dependency: "direct main" description: From 0d5b67e0bffcb56fd4fba4b1b66a32ea6c3186d4 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Mon, 1 Sep 2025 22:32:25 +0800 Subject: [PATCH 13/37] Fix buffer data --- .../preconnect/pre_connect_audio_buffer.dart | 29 ------------------- shared_swift/AudioRenderer.swift | 9 ++---- 2 files changed, 3 insertions(+), 35 deletions(-) diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index dee68ce23..289b0b35f 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -14,32 +14,6 @@ import '../support/native.dart'; typedef PreConnectOnError = void Function(Object error); -class AudioFrame { - final List data; - final int sampleRate; - final int channelCount; - final int frameLength; - final String format; - - AudioFrame({ - required this.data, - required this.sampleRate, - required this.channelCount, - required this.frameLength, - required this.format, - }); - - factory AudioFrame.fromMap(Map map) => AudioFrame( - data: (map['data'] as List) - .map((channel) => (channel as List).map((e) => e as int).toList() as Int16List) - .toList(), - sampleRate: (map['sampleRate'] as int), - channelCount: (map['channelCount'] as int), - frameLength: (map['frameLength'] as int), - format: map['format'] as String, - ); -} - class PreConnectAudioBuffer { static const String dataTopic = 'lk.agent.pre-connect-audio-buffer'; @@ -117,12 +91,9 @@ class PreConnectAudioBuffer { _eventChannel = EventChannel('io.livekit.audio.renderer/channel-$rendererId'); _streamSubscription = _eventChannel?.receiveBroadcastStream().listen((event) { try { - logger.info('[Preconnect audio] event: ${event}'); - final dataChannels = event['data'] as List; final monoData = dataChannels[0].cast(); _bytes.add(monoData); - logger.info('[Preconnect audio] monoData ${monoData.length}, bufferedSize: ${_bytes.length}'); } catch (e) { logger.warning('Error parsing event: $e'); } diff --git a/shared_swift/AudioRenderer.swift b/shared_swift/AudioRenderer.swift index 2ee5e7d20..9b5806fe3 100644 --- a/shared_swift/AudioRenderer.swift +++ b/shared_swift/AudioRenderer.swift @@ -104,8 +104,7 @@ extension AVAudioPCMBuffer { for channel in 0 ..< Int(format.channelCount) { let channelPointer = floatChannelData[channel] - let bytesToRead = Int(frameLength) * MemoryLayout.size - let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: bytesToRead)) + let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: Int(frameLength))) channelsData.append(channelArray) } @@ -117,8 +116,7 @@ extension AVAudioPCMBuffer { for channel in 0 ..< Int(format.channelCount) { let channelPointer = int16ChannelData[channel] - let bytesToRead = Int(frameLength) * MemoryLayout.size - let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: bytesToRead)) + let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: Int(frameLength))) channelsData.append(channelArray) } @@ -130,8 +128,7 @@ extension AVAudioPCMBuffer { for channel in 0 ..< Int(format.channelCount) { let channelPointer = int32ChannelData[channel] - let bytesToRead = Int(frameLength) * MemoryLayout.size - let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: bytesToRead)) + let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: Int(frameLength))) channelsData.append(channelArray) } From f1ec7c7c013dca7c3740a3568c99388f6e8019e9 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 2 Sep 2025 12:38:36 +0800 Subject: [PATCH 14/37] Remove check --- lib/src/preconnect/pre_connect_audio_buffer.dart | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index 289b0b35f..e078f9b58 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -187,10 +187,6 @@ class PreConnectAudioBuffer { logger.info('[Preconnect audio] sending audio data to ${agents.map((e) => e).join(', ')} agent(s)'); - if (_sampleRate == null) { - throw StateError('[Preconnect audio] Sample rate is not set'); - } - final data = _bytes.takeBytes(); logger.info('[Preconnect audio] data.length: ${data.length}, bytes.length: ${_bytes.length}'); From 36f15a0ff2c1f144924637d908a0c30fa3b89bbf Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 2 Sep 2025 14:39:31 +0800 Subject: [PATCH 15/37] Fix buffer format --- lib/src/preconnect/pre_connect_audio_buffer.dart | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index e078f9b58..537fed3be 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -93,7 +93,9 @@ class PreConnectAudioBuffer { try { final dataChannels = event['data'] as List; final monoData = dataChannels[0].cast(); - _bytes.add(monoData); + // Convert Int16 values to bytes using typed data view + final int16List = Int16List.fromList(monoData); + _bytes.add(int16List.buffer.asUint8List()); } catch (e) { logger.warning('Error parsing event: $e'); } From 58b6b6dd5ce81da1e40c5d41083d8a4ebdf7c44b Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 2 Sep 2025 16:55:55 +0800 Subject: [PATCH 16/37] Use only full frames --- shared_swift/AudioRenderer.swift | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/shared_swift/AudioRenderer.swift b/shared_swift/AudioRenderer.swift index 9b5806fe3..24fbd1dc4 100644 --- a/shared_swift/AudioRenderer.swift +++ b/shared_swift/AudioRenderer.swift @@ -154,6 +154,12 @@ extension AudioRenderer: RTCAudioRenderer { } let convertedBuffer = converter!.convert(from: pcmBuffer) + + guard convertedBuffer.frameLength == UInt32(format.sampleRate / 100) else { + print("Converted buffer frame length does not match target format sample rate: \(convertedBuffer.frameLength) != \(format.sampleRate / 100) skipping this frame...") + return + } + let serializedBuffer = convertedBuffer.serialize() // Send the result to Flutter on the main thread From d1a77d36723febe33e5f298f94645530a2749f9b Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 2 Sep 2025 16:57:00 +0800 Subject: [PATCH 17/37] Minor adjustments --- .../preconnect/pre_connect_audio_buffer.dart | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index 537fed3be..e2d3e0591 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -3,6 +3,7 @@ // and uploads via byte stream once an agent is ready. import 'dart:async'; +import 'dart:math'; import 'dart:typed_data'; import 'package:flutter/services.dart'; @@ -19,6 +20,7 @@ class PreConnectAudioBuffer { static const int defaultMaxSize = 10 * 1024 * 1024; // 10MB static const int defaultSampleRate = 24000; // Hz + static const int defaultChunkSize = 64 * 1024; // 64KB chunks for streaming // Reference to the room final Room _room; @@ -35,7 +37,7 @@ class PreConnectAudioBuffer { final PreConnectOnError? _onError; final int _sampleRate; - final BytesBuilder _bytes = BytesBuilder(copy: false); + final BytesBuilder _buffer = BytesBuilder(copy: false); Timer? _timeoutTimer; CancelListenFunc? _participantStateListener; CancelListenFunc? _remoteSubscribedListener; @@ -51,7 +53,7 @@ class PreConnectAudioBuffer { // Getters bool get isRecording => _isRecording; - int get bufferedSize => _bytes.length; + int get bufferedSize => _buffer.length; /// Future that completes when an agent is ready. Future get agentReadyFuture => _agentReadyManager.future; @@ -95,9 +97,10 @@ class PreConnectAudioBuffer { final monoData = dataChannels[0].cast(); // Convert Int16 values to bytes using typed data view final int16List = Int16List.fromList(monoData); - _bytes.add(int16List.buffer.asUint8List()); + final bytes = int16List.buffer.asUint8List(); + _buffer.add(bytes); } catch (e) { - logger.warning('Error parsing event: $e'); + logger.warning('[Preconnect audio] Error parsing event: $e'); } }); @@ -165,7 +168,7 @@ class PreConnectAudioBuffer { _participantStateListener = null; _remoteSubscribedListener?.call(); _remoteSubscribedListener = null; - _bytes.clear(); + _buffer.clear(); _localTrack = null; _agentReadyManager.reset(); @@ -189,8 +192,8 @@ class PreConnectAudioBuffer { logger.info('[Preconnect audio] sending audio data to ${agents.map((e) => e).join(', ')} agent(s)'); - final data = _bytes.takeBytes(); - logger.info('[Preconnect audio] data.length: ${data.length}, bytes.length: ${_bytes.length}'); + final data = _buffer.takeBytes(); + logger.info('[Preconnect audio] data.length: ${data.length}, bytes.length: ${_buffer.length}'); _isSent = true; @@ -201,6 +204,7 @@ class PreConnectAudioBuffer { 'channels': '1', 'trackId': _localTrack!.mediaStreamTrack.id!, }, + totalSize: data.length, destinationIdentities: agents, ); @@ -213,7 +217,7 @@ class PreConnectAudioBuffer { // Compute seconds of audio data sent final int bytesPerSample = 2; // Assuming 16-bit audio final int totalSamples = data.length ~/ bytesPerSample; - final double secondsOfAudio = totalSamples / _sampleRate!; + final double secondsOfAudio = totalSamples / _sampleRate; logger.info( '[Preconnect audio] sent ${(data.length / 1024).toStringAsFixed(1)}KB of audio (${secondsOfAudio.toStringAsFixed(2)} seconds) to ${agents} agent(s)'); From 8f7d984e39dbc44cddc7f45c79d810f5ab8bbbd2 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 2 Sep 2025 17:45:04 +0800 Subject: [PATCH 18/37] Format AudioRenderer --- shared_swift/AudioRenderer.swift | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/shared_swift/AudioRenderer.swift b/shared_swift/AudioRenderer.swift index 24fbd1dc4..5afc3afaf 100644 --- a/shared_swift/AudioRenderer.swift +++ b/shared_swift/AudioRenderer.swift @@ -26,7 +26,6 @@ import WebRTC #endif public class AudioRenderer: NSObject { - public let rendererId: String public let format: AVAudioFormat // Target format @@ -56,7 +55,7 @@ public class AudioRenderer: NSObject { } func detach() { - _track?.remove(audioRenderer: self) + _track?.remove(audioRenderer: self) } deinit { @@ -76,8 +75,8 @@ extension AudioRenderer: FlutterStreamHandler { } } -extension AVAudioPCMBuffer { - public func serialize() -> [String: Any] { +public extension AVAudioPCMBuffer { + func serialize() -> [String: Any] { // The format of the data: // { // "sampleRate": 48000.0, @@ -152,7 +151,7 @@ extension AudioRenderer: RTCAudioRenderer { if converter == nil || pcmBuffer.format != converter!.inputFormat || format != converter!.outputFormat { converter = AudioConverter(from: pcmBuffer.format, to: format) } - + let convertedBuffer = converter!.convert(from: pcmBuffer) guard convertedBuffer.frameLength == UInt32(format.sampleRate / 100) else { From fe2f6bd050334c0674ddbb2748d736a494137ca8 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Fri, 5 Sep 2025 17:46:31 +0800 Subject: [PATCH 19/37] Pre-connect logic for room --- lib/src/core/room.dart | 13 ++++++++++++- lib/src/preconnect/pre_connect_audio_buffer.dart | 4 +--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 126ea4c50..5ad585d7f 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -417,7 +417,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { }); void _setUpEngineListeners() => _engineListener - ..on((event) { + ..on((event) async { _roomInfo = event.response.room; _name = event.response.room.name; _metadata = event.response.room.metadata; @@ -442,6 +442,15 @@ class Room extends DisposableChangeNotifier with EventsEmittable { _localParticipant!.updateFromInfo(event.response.participant); } + // Check if preconnect buffer is recording and publish its track + if (preConnectAudioBuffer.isRecording && preConnectAudioBuffer.localTrack != null) { + logger.info('Publishing preconnect audio track'); + await _localParticipant!.publishAudioTrack( + preConnectAudioBuffer.localTrack!, + publishOptions: roomOptions.defaultAudioPublishOptions.copyWith(preConnect: true), + ); + } + if (connectOptions.protocolVersion.index >= ProtocolVersion.v8.index && engine.fastConnectOptions != null && !engine.fullReconnectOnNext) { @@ -449,6 +458,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable { var audio = options.microphone; bool audioEnabled = audio.enabled == true || audio.track != null; + + // Only enable microphone if preconnect buffer is not active if (audioEnabled) { if (audio.track != null) { _localParticipant!.publishAudioTrack(audio.track as LocalAudioTrack, diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index e2d3e0591..d6aaf7abd 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -3,7 +3,6 @@ // and uploads via byte stream once an agent is ready. import 'dart:async'; -import 'dart:math'; import 'dart:typed_data'; import 'package:flutter/services.dart'; @@ -54,6 +53,7 @@ class PreConnectAudioBuffer { // Getters bool get isRecording => _isRecording; int get bufferedSize => _buffer.length; + LocalAudioTrack? get localTrack => _localTrack; /// Future that completes when an agent is ready. Future get agentReadyFuture => _agentReadyManager.future; @@ -116,8 +116,6 @@ class PreConnectAudioBuffer { } catch (e) { _agentReadyManager.completeError(e); _onError?.call(e); - } finally { - // await reset(); } } }); From 15d85fb51250112e24bead211724a61bc15f176c Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Fri, 5 Sep 2025 17:52:16 +0800 Subject: [PATCH 20/37] Silence concurrency warnings --- shared_swift/AudioConverter.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shared_swift/AudioConverter.swift b/shared_swift/AudioConverter.swift index 89bef58ce..f4f470922 100644 --- a/shared_swift/AudioConverter.swift +++ b/shared_swift/AudioConverter.swift @@ -14,7 +14,7 @@ * limitations under the License. */ -import AVFAudio +@preconcurrency import AVFAudio final class AudioConverter: Sendable { let inputFormat: AVAudioFormat From c5fe16fb56605b96af61c34d858447f2e0ea78bf Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Fri, 5 Sep 2025 19:42:09 +0800 Subject: [PATCH 21/37] Start local audio on task --- shared_swift/LiveKitPlugin.swift | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/shared_swift/LiveKitPlugin.swift b/shared_swift/LiveKitPlugin.swift index aa2905431..f8ba49f56 100644 --- a/shared_swift/LiveKitPlugin.swift +++ b/shared_swift/LiveKitPlugin.swift @@ -272,9 +272,11 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { // Retain processors.renderers[rendererId] = renderer - AudioManager.sharedInstance().startLocalRecording() - - result(true) + // Run on Task to unblock main thread + Task { + let admResult = AudioManager.sharedInstance().startLocalRecording() + result(admResult) + } } public func handleStopAudioRenderer(args: [String: Any?], result: @escaping FlutterResult) { From 4c5e1ad338227023ef0f98b11b373b38d992124a Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Mon, 8 Sep 2025 13:05:59 +0800 Subject: [PATCH 22/37] Don't publish option audio if pre-audio is used --- lib/src/core/room.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 857e84830..067e0959d 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -458,9 +458,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable { var audio = options.microphone; bool audioEnabled = audio.enabled == true || audio.track != null; - + // Only enable microphone if preconnect buffer is not active - if (audioEnabled) { + if (audioEnabled && !preConnectAudioBuffer.isRecording) { if (audio.track != null) { _localParticipant!.publishAudioTrack(audio.track as LocalAudioTrack, publishOptions: roomOptions.defaultAudioPublishOptions); From 48726825c331564706e219371faf7253f210b250 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Mon, 8 Sep 2025 16:16:35 +0800 Subject: [PATCH 23/37] Local track publish listener --- .../preconnect/pre_connect_audio_buffer.dart | 42 +++++++++++++------ 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index d6aaf7abd..b2b0024fd 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -55,6 +55,8 @@ class PreConnectAudioBuffer { int get bufferedSize => _buffer.length; LocalAudioTrack? get localTrack => _localTrack; + Future? _localTrackPublishedEvent; + /// Future that completes when an agent is ready. Future get agentReadyFuture => _agentReadyManager.future; @@ -105,21 +107,24 @@ class PreConnectAudioBuffer { }); // Listen for agent readiness; when active, attempt to send buffer once. - _participantStateListener = _room.events.on((event) async { - // logger.info('[Preconnect audio] State event ${event}'); - - if (event.participant.kind == ParticipantKind.AGENT && event.state == ParticipantState.active) { - logger.info('[Preconnect audio] Agent is active: ${event.participant.identity}'); - try { - await sendAudioData(agents: [event.participant.identity]); - _agentReadyManager.complete(); - } catch (e) { - _agentReadyManager.completeError(e); - _onError?.call(e); - } + _participantStateListener = _room.events.on( + filter: (event) => event.participant.kind == ParticipantKind.AGENT && event.state == ParticipantState.active, + (event) async { + logger.info('[Preconnect audio] Agent is active: ${event.participant.identity}'); + try { + await sendAudioData(agents: [event.participant.identity]); + _agentReadyManager.complete(); + } catch (error) { + _agentReadyManager.completeError(error); + _onError?.call(error); } }); + _localTrackPublishedEvent = _room.events.waitFor( + duration: Duration(seconds: 10), + filter: (event) => event.participant == _room.localParticipant, + ); + _remoteSubscribedListener = _room.events.on((event) async { logger.info('[Preconnect audio] Remote track subscribed: ${event.trackSid}'); await stopRecording(); @@ -169,6 +174,7 @@ class PreConnectAudioBuffer { _buffer.clear(); _localTrack = null; _agentReadyManager.reset(); + _localTrackPublishedEvent = null; logger.info('[Preconnect audio] reset'); } @@ -188,6 +194,16 @@ class PreConnectAudioBuffer { if (_isSent) return; if (agents.isEmpty) return; + // Wait for local track published event + final localTrackPublishedEvent = await _localTrackPublishedEvent; + logger.info('[Preconnect audio] localTrackPublishedEvent: $localTrackPublishedEvent'); + + final localTrackSid = localTrackPublishedEvent?.publication.track?.sid; + if (localTrackSid == null) { + logger.severe('[Preconnect audio] localTrackPublishedEvent is null'); + return; + } + logger.info('[Preconnect audio] sending audio data to ${agents.map((e) => e).join(', ')} agent(s)'); final data = _buffer.takeBytes(); @@ -200,7 +216,7 @@ class PreConnectAudioBuffer { attributes: { 'sampleRate': _sampleRate.toString(), 'channels': '1', - 'trackId': _localTrack!.mediaStreamTrack.id!, + 'trackId': localTrackSid, }, totalSize: data.length, destinationIdentities: agents, From 060d776f8dd2427b4e5cea30cfb824ae1809ca1c Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Mon, 8 Sep 2025 16:23:57 +0800 Subject: [PATCH 24/37] Simplify --- lib/src/preconnect/pre_connect_audio_buffer.dart | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index b2b0024fd..1c94e39d9 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -39,7 +39,6 @@ class PreConnectAudioBuffer { final BytesBuilder _buffer = BytesBuilder(copy: false); Timer? _timeoutTimer; CancelListenFunc? _participantStateListener; - CancelListenFunc? _remoteSubscribedListener; final CompleterManager _agentReadyManager = CompleterManager(); @@ -124,11 +123,6 @@ class PreConnectAudioBuffer { duration: Duration(seconds: 10), filter: (event) => event.participant == _room.localParticipant, ); - - _remoteSubscribedListener = _room.events.on((event) async { - logger.info('[Preconnect audio] Remote track subscribed: ${event.trackSid}'); - await stopRecording(); - }); } Future stopRecording() async { @@ -169,8 +163,6 @@ class PreConnectAudioBuffer { _timeoutTimer?.cancel(); _participantStateListener?.call(); _participantStateListener = null; - _remoteSubscribedListener?.call(); - _remoteSubscribedListener = null; _buffer.clear(); _localTrack = null; _agentReadyManager.reset(); @@ -179,12 +171,11 @@ class PreConnectAudioBuffer { logger.info('[Preconnect audio] reset'); } - /// Dispose the audio buffer and clean up all resources. + // Dispose the audio buffer and clean up all resources. void dispose() { _agentReadyManager.dispose(); _timeoutTimer?.cancel(); _participantStateListener?.call(); - _remoteSubscribedListener?.call(); } Future sendAudioData({ From 841c54a9c8ce6915472ba81876ef945637bc2e96 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Mon, 8 Sep 2025 17:56:51 +0800 Subject: [PATCH 25/37] Events --- lib/src/events.dart | 30 +++++++++++++++++++ .../preconnect/pre_connect_audio_buffer.dart | 23 ++++++++++++-- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/lib/src/events.dart b/lib/src/events.dart index 200792a3b..5255162a4 100644 --- a/lib/src/events.dart +++ b/lib/src/events.dart @@ -589,3 +589,33 @@ class TrackProcessorUpdateEvent with TrackEvent { String toString() => '${runtimeType}' 'track: ${track})'; } + +/// Pre-connect audio buffer has started recording. +/// Emitted by [Room]. +class PreConnectAudioBufferStartedEvent with RoomEvent { + final int sampleRate; + final Duration timeout; + const PreConnectAudioBufferStartedEvent({ + required this.sampleRate, + required this.timeout, + }); + + @override + String toString() => '${runtimeType}' + '(sampleRate: ${sampleRate}, timeout: ${timeout})'; +} + +/// Pre-connect audio buffer has stopped recording. +/// Emitted by [Room]. +class PreConnectAudioBufferStoppedEvent with RoomEvent { + final int bufferedSize; + final bool isDataSent; + const PreConnectAudioBufferStoppedEvent({ + required this.bufferedSize, + required this.isDataSent, + }); + + @override + String toString() => '${runtimeType}' + '(bufferedSize: ${bufferedSize}, isDataSent: ${isDataSent})'; +} diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index 1c94e39d9..eea3cf502 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -6,11 +6,18 @@ import 'dart:async'; import 'dart:typed_data'; import 'package:flutter/services.dart'; -import 'package:livekit_client/livekit_client.dart'; import 'package:uuid/uuid.dart'; +import '../core/room.dart'; +import '../events.dart'; +import '../logger.dart'; +import '../participant/local.dart'; import '../support/completer_manager.dart'; import '../support/native.dart'; +import '../track/local/audio.dart'; +import '../types/data_stream.dart'; +import '../types/other.dart'; +import '../types/participant_state.dart'; typedef PreConnectOnError = void Function(Object error); @@ -60,7 +67,7 @@ class PreConnectAudioBuffer { Future get agentReadyFuture => _agentReadyManager.future; Future startRecording({ - Duration timeout = const Duration(seconds: 10), + Duration timeout = const Duration(seconds: 20), }) async { if (_isRecording) { logger.warning('Already recording'); @@ -123,6 +130,12 @@ class PreConnectAudioBuffer { duration: Duration(seconds: 10), filter: (event) => event.participant == _room.localParticipant, ); + + // Emit the started event + _room.events.emit(PreConnectAudioBufferStartedEvent( + sampleRate: _sampleRate, + timeout: timeout, + )); } Future stopRecording() async { @@ -154,6 +167,12 @@ class PreConnectAudioBuffer { // Complete agent ready future if not already completed _agentReadyManager.complete(); + // Emit the stopped event + _room.events.emit(PreConnectAudioBufferStoppedEvent( + bufferedSize: _buffer.length, + isDataSent: _isSent, + )); + logger.info('[Preconnect audio] stopped recording'); } From 708c5119d0b46ca6a0e95bd357bbf53337307a35 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Mon, 8 Sep 2025 18:43:02 +0800 Subject: [PATCH 26/37] F1 --- .../preconnect/pre_connect_audio_buffer.dart | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index eea3cf502..3c04e0f6d 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -183,18 +183,33 @@ class PreConnectAudioBuffer { _participantStateListener?.call(); _participantStateListener = null; _buffer.clear(); + + // Don't stop the local track - it will continue to be used by the Room _localTrack = null; + _agentReadyManager.reset(); _localTrackPublishedEvent = null; + // Reset the _isSent flag to allow data sending on next use + _isSent = false; + logger.info('[Preconnect audio] reset'); } // Dispose the audio buffer and clean up all resources. - void dispose() { + Future dispose() async { + // Ensure we stop recording first + await stopRecording(); + + // Don't stop the local track - it will continue to be used by the Room + _localTrack = null; + _agentReadyManager.dispose(); _timeoutTimer?.cancel(); _participantStateListener?.call(); + _participantStateListener = null; + + logger.info('[Preconnect audio] disposed'); } Future sendAudioData({ From c4205a4580bdb9039f29ad5feee1e45c5bea57e2 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Mon, 8 Sep 2025 18:48:42 +0800 Subject: [PATCH 27/37] Dispose buffer --- lib/src/core/room.dart | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 6b686e9a0..df71e4137 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -137,7 +137,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { final Map _textStreamHandlers = {}; @internal - late final preConnectAudioBuffer = PreConnectAudioBuffer(this); + late final PreConnectAudioBuffer preConnectAudioBuffer; // for testing @internal @@ -175,6 +175,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable { _setupDataStreamListeners(); + preConnectAudioBuffer = PreConnectAudioBuffer(this); + onDispose(() async { // clean up routine await _cleanUp(); @@ -975,6 +977,8 @@ extension RoomPrivateMethods on Room { _activeSpeakers.clear(); + await preConnectAudioBuffer.dispose(); + // clean up engine await engine.cleanUp(); From 1a8edb7eca3c83a9c56a46f8bc415778180507d5 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Mon, 8 Sep 2025 18:55:32 +0800 Subject: [PATCH 28/37] Fix stop audio renderer --- lib/src/preconnect/pre_connect_audio_buffer.dart | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index 3c04e0f6d..eafb16165 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -153,15 +153,12 @@ class PreConnectAudioBuffer { _eventChannel = null; final rendererId = _rendererId; - if (rendererId == null) { - logger.warning('No rendererId'); - return; + if (rendererId != null) { + await Native.stopAudioRenderer( + rendererId: rendererId, + ); } - await Native.stopAudioRenderer( - rendererId: rendererId, - ); - _rendererId = null; // Complete agent ready future if not already completed From e9e676d894099998828363eda9447a47c94cf932 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Mon, 8 Sep 2025 19:23:38 +0800 Subject: [PATCH 29/37] fix clean up --- lib/src/core/room.dart | 6 +++-- lib/src/events.dart | 6 ++--- .../preconnect/pre_connect_audio_buffer.dart | 22 +++++-------------- 3 files changed, 13 insertions(+), 21 deletions(-) diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index df71e4137..dd55dbeba 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -180,6 +180,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable { onDispose(() async { // clean up routine await _cleanUp(); + // dispose preConnectAudioBuffer + await preConnectAudioBuffer.dispose(); // dispose events await events.dispose(); // dispose local participant @@ -452,7 +454,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { publishOptions: roomOptions.defaultAudioPublishOptions.copyWith(preConnect: true), ); } - + if (connectOptions.protocolVersion.index >= ProtocolVersion.v8.index && engine.fastConnectOptions != null && !engine.fullReconnectOnNext) { @@ -977,7 +979,7 @@ extension RoomPrivateMethods on Room { _activeSpeakers.clear(); - await preConnectAudioBuffer.dispose(); + await preConnectAudioBuffer.reset(); // clean up engine await engine.cleanUp(); diff --git a/lib/src/events.dart b/lib/src/events.dart index 5255162a4..4dda14096 100644 --- a/lib/src/events.dart +++ b/lib/src/events.dart @@ -609,13 +609,13 @@ class PreConnectAudioBufferStartedEvent with RoomEvent { /// Emitted by [Room]. class PreConnectAudioBufferStoppedEvent with RoomEvent { final int bufferedSize; - final bool isDataSent; + final bool isBufferSent; const PreConnectAudioBufferStoppedEvent({ required this.bufferedSize, - required this.isDataSent, + required this.isBufferSent, }); @override String toString() => '${runtimeType}' - '(bufferedSize: ${bufferedSize}, isDataSent: ${isDataSent})'; + '(bufferedSize: ${bufferedSize}, isDataSent: ${isBufferSent})'; } diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index eafb16165..ddb66be8f 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -33,7 +33,7 @@ class PreConnectAudioBuffer { // Internal states bool _isRecording = false; - bool _isSent = false; + bool _isBufferSent = false; String? _rendererId; LocalAudioTrack? _localTrack; @@ -167,7 +167,7 @@ class PreConnectAudioBuffer { // Emit the stopped event _room.events.emit(PreConnectAudioBufferStoppedEvent( bufferedSize: _buffer.length, - isDataSent: _isSent, + isBufferSent: _isBufferSent, )); logger.info('[Preconnect audio] stopped recording'); @@ -188,24 +188,14 @@ class PreConnectAudioBuffer { _localTrackPublishedEvent = null; // Reset the _isSent flag to allow data sending on next use - _isSent = false; + _isBufferSent = false; logger.info('[Preconnect audio] reset'); } // Dispose the audio buffer and clean up all resources. Future dispose() async { - // Ensure we stop recording first - await stopRecording(); - - // Don't stop the local track - it will continue to be used by the Room - _localTrack = null; - - _agentReadyManager.dispose(); - _timeoutTimer?.cancel(); - _participantStateListener?.call(); - _participantStateListener = null; - + await reset(); logger.info('[Preconnect audio] disposed'); } @@ -213,7 +203,7 @@ class PreConnectAudioBuffer { required List agents, String topic = dataTopic, }) async { - if (_isSent) return; + if (_isBufferSent) return; if (agents.isEmpty) return; // Wait for local track published event @@ -231,7 +221,7 @@ class PreConnectAudioBuffer { final data = _buffer.takeBytes(); logger.info('[Preconnect audio] data.length: ${data.length}, bytes.length: ${_buffer.length}'); - _isSent = true; + _isBufferSent = true; final streamOptions = StreamBytesOptions( topic: topic, From 9d5f93d76844bed084332b1f8eac4236e5e2518d Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Mon, 8 Sep 2025 20:20:27 +0800 Subject: [PATCH 30/37] Android --- .../io/livekit/plugin/AudioProcessors.kt | 39 +++ .../kotlin/io/livekit/plugin/AudioRenderer.kt | 299 ++++++++++++++++++ .../kotlin/io/livekit/plugin/LiveKitPlugin.kt | 159 +++++++++- 3 files changed, 481 insertions(+), 16 deletions(-) create mode 100644 android/src/main/kotlin/io/livekit/plugin/AudioProcessors.kt create mode 100644 android/src/main/kotlin/io/livekit/plugin/AudioRenderer.kt diff --git a/android/src/main/kotlin/io/livekit/plugin/AudioProcessors.kt b/android/src/main/kotlin/io/livekit/plugin/AudioProcessors.kt new file mode 100644 index 000000000..743939fa1 --- /dev/null +++ b/android/src/main/kotlin/io/livekit/plugin/AudioProcessors.kt @@ -0,0 +1,39 @@ +/* + * Copyright 2024 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.plugin + +/** + * Container for managing audio processors (renderers and visualizers) for a specific audio track + * Similar to iOS AudioProcessors implementation + */ +class AudioProcessors( + val track: LKAudioTrack +) { + val renderers = mutableMapOf() + val visualizers = mutableMapOf() + + /** + * Clean up all processors and release resources + */ + fun cleanup() { + renderers.values.forEach { it.detach() } + renderers.clear() + + visualizers.values.forEach { it.stop() } + visualizers.clear() + } +} diff --git a/android/src/main/kotlin/io/livekit/plugin/AudioRenderer.kt b/android/src/main/kotlin/io/livekit/plugin/AudioRenderer.kt new file mode 100644 index 000000000..735bb6fef --- /dev/null +++ b/android/src/main/kotlin/io/livekit/plugin/AudioRenderer.kt @@ -0,0 +1,299 @@ +/* + * Copyright 2024 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.plugin + +import android.os.Handler +import android.os.Looper +import io.flutter.plugin.common.BinaryMessenger +import io.flutter.plugin.common.EventChannel +import org.webrtc.AudioTrackSink +import java.nio.ByteBuffer +import java.nio.ByteOrder + +/** + * AudioRenderer for capturing audio data from WebRTC tracks and streaming to Flutter + * Similar to iOS AudioRenderer implementation + */ +class AudioRenderer( + private val audioTrack: LKAudioTrack, + private val binaryMessenger: BinaryMessenger, + private val rendererId: String, + private val targetFormat: RendererAudioFormat +) : EventChannel.StreamHandler, AudioTrackSink { + + private var eventChannel: EventChannel? = null + private var eventSink: EventChannel.EventSink? = null + private var isAttached = false + + private val handler: Handler by lazy { + Handler(Looper.getMainLooper()) + } + + init { + val channelName = "io.livekit.audio.renderer/channel-$rendererId" + eventChannel = EventChannel(binaryMessenger, channelName) + eventChannel?.setStreamHandler(this) + + // Attach to the audio track + audioTrack.addSink(this) + isAttached = true + } + + fun detach() { + if (isAttached) { + audioTrack.removeSink(this) + isAttached = false + } + eventChannel?.setStreamHandler(null) + eventSink = null + } + + override fun onListen(arguments: Any?, events: EventChannel.EventSink?) { + eventSink = events + } + + override fun onCancel(arguments: Any?) { + eventSink = null + } + + override fun onData( + audioData: ByteBuffer, + bitsPerSample: Int, + sampleRate: Int, + numberOfChannels: Int, + numberOfFrames: Int, + absoluteCaptureTimestampMs: Long + ) { + eventSink?.let { sink -> + try { + // Convert audio data to the target format + val convertedData = convertAudioData( + audioData, + bitsPerSample, + sampleRate, + numberOfChannels, + numberOfFrames + ) + + // Send to Flutter on the main thread + handler.post { + sink.success(convertedData) + } + } catch (e: Exception) { + handler.post { + sink.error( + "AUDIO_CONVERSION_ERROR", + "Failed to convert audio data: ${e.message}", + null + ) + } + } + } + } + + private fun convertAudioData( + audioData: ByteBuffer, + bitsPerSample: Int, + sampleRate: Int, + numberOfChannels: Int, + numberOfFrames: Int + ): Map { + // Create result similar to iOS implementation + val result = mutableMapOf( + "sampleRate" to sampleRate, + "channels" to numberOfChannels, + "frameLength" to numberOfFrames + ) + + // Convert based on target format + when (targetFormat.commonFormat) { + "int16" -> { + result["commonFormat"] = "int16" + result["data"] = + convertToInt16(audioData, bitsPerSample, numberOfChannels, numberOfFrames) + } + + "float32" -> { + result["commonFormat"] = "float32" + result["data"] = + convertToFloat32(audioData, bitsPerSample, numberOfChannels, numberOfFrames) + } + + else -> { + result["commonFormat"] = "int16" // Default fallback + result["data"] = + convertToInt16(audioData, bitsPerSample, numberOfChannels, numberOfFrames) + } + } + + return result + } + + private fun convertToInt16( + audioData: ByteBuffer, + bitsPerSample: Int, + numberOfChannels: Int, + numberOfFrames: Int + ): List> { + val channelsData = mutableListOf>() + + // Prepare buffer for reading + val buffer = audioData.duplicate() + buffer.order(ByteOrder.LITTLE_ENDIAN) + buffer.rewind() + + when (bitsPerSample) { + 16 -> { + // Already 16-bit, just reformat by channels + for (channel in 0 until numberOfChannels) { + val channelData = mutableListOf() + buffer.position(0) // Start from beginning for each channel + + for (frame in 0 until numberOfFrames) { + val sampleIndex = frame * numberOfChannels + channel + val byteIndex = sampleIndex * 2 + + if (byteIndex + 1 < buffer.capacity()) { + buffer.position(byteIndex) + val sample = buffer.short.toInt() + channelData.add(sample) + } + } + channelsData.add(channelData) + } + } + + 32 -> { + // Convert from 32-bit to 16-bit + for (channel in 0 until numberOfChannels) { + val channelData = mutableListOf() + buffer.position(0) + + for (frame in 0 until numberOfFrames) { + val sampleIndex = frame * numberOfChannels + channel + val byteIndex = sampleIndex * 4 + + if (byteIndex + 3 < buffer.capacity()) { + buffer.position(byteIndex) + val sample32 = buffer.int + // Convert 32-bit to 16-bit by right-shifting + val sample16 = (sample32 shr 16).toShort().toInt() + channelData.add(sample16) + } + } + channelsData.add(channelData) + } + } + + else -> { + // Unsupported format, return empty data + repeat(numberOfChannels) { + channelsData.add(emptyList()) + } + } + } + + return channelsData + } + + private fun convertToFloat32( + audioData: ByteBuffer, + bitsPerSample: Int, + numberOfChannels: Int, + numberOfFrames: Int + ): List> { + val channelsData = mutableListOf>() + + val buffer = audioData.duplicate() + buffer.order(ByteOrder.LITTLE_ENDIAN) + buffer.rewind() + + when (bitsPerSample) { + 16 -> { + // Convert from 16-bit to float32 + for (channel in 0 until numberOfChannels) { + val channelData = mutableListOf() + buffer.position(0) + + for (frame in 0 until numberOfFrames) { + val sampleIndex = frame * numberOfChannels + channel + val byteIndex = sampleIndex * 2 + + if (byteIndex + 1 < buffer.capacity()) { + buffer.position(byteIndex) + val sample16 = buffer.short + // Convert to float (-1.0 to 1.0) + val sampleFloat = sample16.toFloat() / Short.MAX_VALUE + channelData.add(sampleFloat) + } + } + channelsData.add(channelData) + } + } + + 32 -> { + // Assume 32-bit float input + for (channel in 0 until numberOfChannels) { + val channelData = mutableListOf() + buffer.position(0) + + for (frame in 0 until numberOfFrames) { + val sampleIndex = frame * numberOfChannels + channel + val byteIndex = sampleIndex * 4 + + if (byteIndex + 3 < buffer.capacity()) { + buffer.position(byteIndex) + val sampleFloat = buffer.float + channelData.add(sampleFloat) + } + } + channelsData.add(channelData) + } + } + + else -> { + // Unsupported format + repeat(numberOfChannels) { + channelsData.add(emptyList()) + } + } + } + + return channelsData + } +} + +/** + * Audio format specification for the renderer + */ +data class RendererAudioFormat( + val bitsPerSample: Int, + val sampleRate: Int, + val numberOfChannels: Int, + val commonFormat: String = "int16" +) { + companion object { + fun fromMap(formatMap: Map): RendererAudioFormat? { + val bitsPerSample = formatMap["bitsPerSample"] as? Int ?: 16 + val sampleRate = formatMap["sampleRate"] as? Int ?: 48000 + val numberOfChannels = formatMap["channels"] as? Int ?: 1 + val commonFormat = formatMap["commonFormat"] as? String ?: "int16" + + return RendererAudioFormat(bitsPerSample, sampleRate, numberOfChannels, commonFormat) + } + } +} diff --git a/android/src/main/kotlin/io/livekit/plugin/LiveKitPlugin.kt b/android/src/main/kotlin/io/livekit/plugin/LiveKitPlugin.kt index 079988f83..7401ed041 100644 --- a/android/src/main/kotlin/io/livekit/plugin/LiveKitPlugin.kt +++ b/android/src/main/kotlin/io/livekit/plugin/LiveKitPlugin.kt @@ -31,15 +31,17 @@ import io.flutter.plugin.common.BinaryMessenger import org.webrtc.AudioTrack /** LiveKitPlugin */ -class LiveKitPlugin: FlutterPlugin, MethodCallHandler { +class LiveKitPlugin : FlutterPlugin, MethodCallHandler { private var processors = mutableMapOf() + private var audioProcessors = mutableMapOf() private var flutterWebRTCPlugin = FlutterWebRTCPlugin.sharedSingleton private var binaryMessenger: BinaryMessenger? = null + /// The MethodChannel that will the communication between Flutter and native Android /// /// This local reference serves to register the plugin with the Flutter Engine and unregister it /// when the Flutter Engine is detached from the Activity - private lateinit var channel : MethodChannel + private lateinit var channel: MethodChannel override fun onAttachedToEngine(@NonNull flutterPluginBinding: FlutterPlugin.FlutterPluginBinding) { channel = MethodChannel(flutterPluginBinding.binaryMessenger, "livekit_client") @@ -65,21 +67,22 @@ class LiveKitPlugin: FlutterPlugin, MethodCallHandler { audioTrack = LKLocalAudioTrack(track as LocalAudioTrack) } else { val remoteTrack = flutterWebRTCPlugin.getRemoteTrack(trackId) - if (remoteTrack != null) { - audioTrack = LKRemoteAudioTrack(remoteTrack as AudioTrack) - } + if (remoteTrack != null) { + audioTrack = LKRemoteAudioTrack(remoteTrack as AudioTrack) + } } - if(audioTrack == null) { + if (audioTrack == null) { result.error("INVALID_ARGUMENT", "track not found", null) return } val visualizer = Visualizer( - barCount = barCount, isCentered = isCentered, + barCount = barCount, isCentered = isCentered, smoothTransition = smoothTransition, audioTrack = audioTrack, binaryMessenger = binaryMessenger!!, - visualizerId = visualizerId) + visualizerId = visualizerId + ) processors[visualizerId] = visualizer result.success(null) @@ -93,7 +96,7 @@ class LiveKitPlugin: FlutterPlugin, MethodCallHandler { return } processors.forEach { (k, visualizer) -> - if(k == visualizerId) { + if (k == visualizerId) { visualizer.stop() } } @@ -101,19 +104,143 @@ class LiveKitPlugin: FlutterPlugin, MethodCallHandler { result.success(null) } - override fun onMethodCall(@NonNull call: MethodCall, @NonNull result: Result) { - if(call.method == "startVisualizer") { - handleStartVisualizer(call, result) + /** + * Get or create AudioProcessors for a given trackId + */ + private fun getAudioProcessors(trackId: String): AudioProcessors? { + // Return existing if found + audioProcessors[trackId]?.let { return it } + + // Create new AudioProcessors for this track + var audioTrack: LKAudioTrack? = null + + val localTrack = flutterWebRTCPlugin.getLocalTrack(trackId) + if (localTrack != null) { + audioTrack = LKLocalAudioTrack(localTrack as LocalAudioTrack) + } else { + val remoteTrack = flutterWebRTCPlugin.getRemoteTrack(trackId) + if (remoteTrack != null) { + audioTrack = LKRemoteAudioTrack(remoteTrack as AudioTrack) + } + } + + return audioTrack?.let { track -> + val processors = AudioProcessors(track) + audioProcessors[trackId] = processors + processors + } + } + + /** + * Handle startAudioRenderer method call + */ + private fun handleStartAudioRenderer(@NonNull call: MethodCall, @NonNull result: Result) { + val trackId = call.argument("trackId") + val rendererId = call.argument("rendererId") + val formatMap = call.argument>("format") + + if (trackId == null) { + result.error("INVALID_ARGUMENT", "trackId is required", null) + return + } + + if (rendererId == null) { + result.error("INVALID_ARGUMENT", "rendererId is required", null) + return + } + + if (formatMap == null) { + result.error("INVALID_ARGUMENT", "format is required", null) return - } else if(call.method == "stopVisualizer") { - handleStopVisualizer(call, result) + } + + val format = RendererAudioFormat.fromMap(formatMap) + if (format == null) { + result.error("INVALID_ARGUMENT", "Failed to parse format", null) return } - // no-op for now - result.notImplemented() + + val processors = getAudioProcessors(trackId) + if (processors == null) { + result.error("INVALID_ARGUMENT", "No such track", null) + return + } + + // Check if renderer already exists + if (processors.renderers[rendererId] != null) { + result.success(true) + return + } + + try { + val renderer = AudioRenderer( + processors.track, + binaryMessenger!!, + rendererId, + format + ) + + processors.renderers[rendererId] = renderer + result.success(true) + } catch (e: Exception) { + result.error("RENDERER_ERROR", "Failed to create audio renderer: ${e.message}", null) + } + } + + /** + * Handle stopAudioRenderer method call + */ + private fun handleStopAudioRenderer(@NonNull call: MethodCall, @NonNull result: Result) { + val rendererId = call.argument("rendererId") + + if (rendererId == null) { + result.error("INVALID_ARGUMENT", "rendererId is required", null) + return + } + + // Find and remove renderer from all processors + for (processors in audioProcessors.values) { + processors.renderers[rendererId]?.let { renderer -> + renderer.detach() + processors.renderers.remove(rendererId) + } + } + + result.success(true) + } + + override fun onMethodCall(@NonNull call: MethodCall, @NonNull result: Result) { + when (call.method) { + "startVisualizer" -> { + handleStartVisualizer(call, result) + } + + "stopVisualizer" -> { + handleStopVisualizer(call, result) + } + + "startAudioRenderer" -> { + handleStartAudioRenderer(call, result) + } + + "stopAudioRenderer" -> { + handleStopAudioRenderer(call, result) + } + + else -> { + result.notImplemented() + } + } } override fun onDetachedFromEngine(@NonNull binding: FlutterPlugin.FlutterPluginBinding) { channel.setMethodCallHandler(null) + + // Cleanup all processors + processors.values.forEach { it.stop() } + processors.clear() + + audioProcessors.values.forEach { it.cleanup() } + audioProcessors.clear() } } From 7ec5c4a2d8f440f83d8d58e4cf0706dad893ac97 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 9 Sep 2025 00:10:07 +0800 Subject: [PATCH 31/37] Move start loc recording to rtc --- shared_swift/LiveKitPlugin.swift | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/shared_swift/LiveKitPlugin.swift b/shared_swift/LiveKitPlugin.swift index f8ba49f56..c73c293ef 100644 --- a/shared_swift/LiveKitPlugin.swift +++ b/shared_swift/LiveKitPlugin.swift @@ -272,11 +272,7 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { // Retain processors.renderers[rendererId] = renderer - // Run on Task to unblock main thread - Task { - let admResult = AudioManager.sharedInstance().startLocalRecording() - result(admResult) - } + result(true) } public func handleStopAudioRenderer(args: [String: Any?], result: @escaping FlutterResult) { From 3136206393a0f7fc7ce9025470d29912dc741230 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 9 Sep 2025 15:36:03 +0800 Subject: [PATCH 32/37] Use rendered sample rate --- .../preconnect/pre_connect_audio_buffer.dart | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index ddb66be8f..a82fc223e 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -6,6 +6,7 @@ import 'dart:async'; import 'dart:typed_data'; import 'package:flutter/services.dart'; +import 'package:flutter_webrtc/flutter_webrtc.dart' as webrtc; import 'package:uuid/uuid.dart'; import '../core/room.dart'; @@ -41,7 +42,8 @@ class PreConnectAudioBuffer { StreamSubscription? _streamSubscription; final PreConnectOnError? _onError; - final int _sampleRate; + final int _requestSampleRate; + int? _renderedSampleRate; final BytesBuilder _buffer = BytesBuilder(copy: false); Timer? _timeoutTimer; @@ -54,7 +56,7 @@ class PreConnectAudioBuffer { PreConnectOnError? onError, int sampleRate = defaultSampleRate, }) : _onError = onError, - _sampleRate = sampleRate; + _requestSampleRate = sampleRate; // Getters bool get isRecording => _isRecording; @@ -89,11 +91,13 @@ class PreConnectAudioBuffer { rendererId: rendererId, format: { 'commonFormat': 'int16', - 'sampleRate': _sampleRate, + 'sampleRate': _requestSampleRate, 'channels': 1, }, ); + await webrtc.NativeAudioManagement.startLocalRecording(); + _rendererId = rendererId; logger.info('startAudioRenderer result: $result'); @@ -101,6 +105,8 @@ class PreConnectAudioBuffer { _eventChannel = EventChannel('io.livekit.audio.renderer/channel-$rendererId'); _streamSubscription = _eventChannel?.receiveBroadcastStream().listen((event) { try { + // Actual sample rate of the audio data, can differ from the request sample rate + _renderedSampleRate = event['sampleRate'] as int; final dataChannels = event['data'] as List; final monoData = dataChannels[0].cast(); // Convert Int16 values to bytes using typed data view @@ -133,7 +139,7 @@ class PreConnectAudioBuffer { // Emit the started event _room.events.emit(PreConnectAudioBufferStartedEvent( - sampleRate: _sampleRate, + sampleRate: _requestSampleRate, timeout: timeout, )); } @@ -206,6 +212,12 @@ class PreConnectAudioBuffer { if (_isBufferSent) return; if (agents.isEmpty) return; + final sampleRate = _renderedSampleRate; + if (sampleRate == null) { + logger.severe('[Preconnect audio] renderedSampleRate is null'); + return; + } + // Wait for local track published event final localTrackPublishedEvent = await _localTrackPublishedEvent; logger.info('[Preconnect audio] localTrackPublishedEvent: $localTrackPublishedEvent'); @@ -226,7 +238,7 @@ class PreConnectAudioBuffer { final streamOptions = StreamBytesOptions( topic: topic, attributes: { - 'sampleRate': _sampleRate.toString(), + 'sampleRate': sampleRate.toString(), 'channels': '1', 'trackId': localTrackSid, }, @@ -243,7 +255,7 @@ class PreConnectAudioBuffer { // Compute seconds of audio data sent final int bytesPerSample = 2; // Assuming 16-bit audio final int totalSamples = data.length ~/ bytesPerSample; - final double secondsOfAudio = totalSamples / _sampleRate; + final double secondsOfAudio = totalSamples / sampleRate; logger.info( '[Preconnect audio] sent ${(data.length / 1024).toStringAsFixed(1)}KB of audio (${secondsOfAudio.toStringAsFixed(2)} seconds) to ${agents} agent(s)'); From bd3c511e0324545d1bd99a0ab3f9c06b86db8f0e Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 9 Sep 2025 16:03:10 +0800 Subject: [PATCH 33/37] Clean up android logic --- .../kotlin/io/livekit/plugin/LiveKitPlugin.kt | 42 +++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/android/src/main/kotlin/io/livekit/plugin/LiveKitPlugin.kt b/android/src/main/kotlin/io/livekit/plugin/LiveKitPlugin.kt index 7401ed041..88c73c658 100644 --- a/android/src/main/kotlin/io/livekit/plugin/LiveKitPlugin.kt +++ b/android/src/main/kotlin/io/livekit/plugin/LiveKitPlugin.kt @@ -32,7 +32,6 @@ import org.webrtc.AudioTrack /** LiveKitPlugin */ class LiveKitPlugin : FlutterPlugin, MethodCallHandler { - private var processors = mutableMapOf() private var audioProcessors = mutableMapOf() private var flutterWebRTCPlugin = FlutterWebRTCPlugin.sharedSingleton private var binaryMessenger: BinaryMessenger? = null @@ -57,34 +56,33 @@ class LiveKitPlugin : FlutterPlugin, MethodCallHandler { result.error("INVALID_ARGUMENT", "trackId and visualizerId is required", null) return } - var audioTrack: LKAudioTrack? = null + val barCount = call.argument("barCount") ?: 7 val isCentered = call.argument("isCentered") ?: true var smoothTransition = call.argument("smoothTransition") ?: true - val track = flutterWebRTCPlugin.getLocalTrack(trackId) - if (track != null) { - audioTrack = LKLocalAudioTrack(track as LocalAudioTrack) - } else { - val remoteTrack = flutterWebRTCPlugin.getRemoteTrack(trackId) - if (remoteTrack != null) { - audioTrack = LKRemoteAudioTrack(remoteTrack as AudioTrack) - } + val processors = getAudioProcessors(trackId) + if (processors == null) { + result.error("INVALID_ARGUMENT", "track not found", null) + return } - if (audioTrack == null) { - result.error("INVALID_ARGUMENT", "track not found", null) + // Check if visualizer already exists + if (processors.visualizers[visualizerId] != null) { + result.success(null) return } val visualizer = Visualizer( - barCount = barCount, isCentered = isCentered, + barCount = barCount, + isCentered = isCentered, smoothTransition = smoothTransition, - audioTrack = audioTrack, binaryMessenger = binaryMessenger!!, + audioTrack = processors.track, + binaryMessenger = binaryMessenger!!, visualizerId = visualizerId ) - processors[visualizerId] = visualizer + processors.visualizers[visualizerId] = visualizer result.success(null) } @@ -95,12 +93,15 @@ class LiveKitPlugin : FlutterPlugin, MethodCallHandler { result.error("INVALID_ARGUMENT", "trackId and visualizerId is required", null) return } - processors.forEach { (k, visualizer) -> - if (k == visualizerId) { + + // Find and remove visualizer from all processors + for (processors in audioProcessors.values) { + processors.visualizers[visualizerId]?.let { visualizer -> visualizer.stop() + processors.visualizers.remove(visualizerId) } } - processors.entries.removeAll { (k, v) -> k == visualizerId } + result.success(null) } @@ -177,7 +178,7 @@ class LiveKitPlugin : FlutterPlugin, MethodCallHandler { processors.track, binaryMessenger!!, rendererId, - format + format, ) processors.renderers[rendererId] = renderer @@ -237,9 +238,6 @@ class LiveKitPlugin : FlutterPlugin, MethodCallHandler { channel.setMethodCallHandler(null) // Cleanup all processors - processors.values.forEach { it.stop() } - processors.clear() - audioProcessors.values.forEach { it.cleanup() } audioProcessors.clear() } From 4413fe5f321dcb8fb673b722e86678d7a04afece Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Thu, 11 Sep 2025 11:37:01 +0800 Subject: [PATCH 34/37] Update headers --- lib/src/core/room_preconnect.dart | 14 ++++++++++++++ lib/src/preconnect/pre_connect_audio_buffer.dart | 14 ++++++++++++-- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/lib/src/core/room_preconnect.dart b/lib/src/core/room_preconnect.dart index 023cd0488..ce3b7a9ae 100644 --- a/lib/src/core/room_preconnect.dart +++ b/lib/src/core/room_preconnect.dart @@ -1,3 +1,17 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + import 'dart:async'; import '../logger.dart'; diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index a82fc223e..18b9485a0 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -1,6 +1,16 @@ // Copyright 2025 LiveKit, Inc. -// Lightweight pre-connect audio buffer (scaffold). Captures bytes externally -// and uploads via byte stream once an agent is ready. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. import 'dart:async'; import 'dart:typed_data'; From f890929d54e588796ff52861d1bb732fb226728b Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Thu, 11 Sep 2025 11:37:15 +0800 Subject: [PATCH 35/37] Stop native audio on error --- lib/src/core/room_preconnect.dart | 1 + lib/src/preconnect/pre_connect_audio_buffer.dart | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/src/core/room_preconnect.dart b/lib/src/core/room_preconnect.dart index ce3b7a9ae..8876d530a 100644 --- a/lib/src/core/room_preconnect.dart +++ b/lib/src/core/room_preconnect.dart @@ -32,6 +32,7 @@ extension RoomPreConnect on Room { await preConnectAudioBuffer.agentReadyFuture; return result; } catch (error) { + await preConnectAudioBuffer.stopRecording(withError: error); logger.warning('[Preconnect] operation failed with error: $error'); rethrow; } finally { diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart index 18b9485a0..4449f18dc 100644 --- a/lib/src/preconnect/pre_connect_audio_buffer.dart +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -154,11 +154,8 @@ class PreConnectAudioBuffer { )); } - Future stopRecording() async { - if (!_isRecording) { - logger.warning('Not recording'); - return; - } + Future stopRecording({Object? withError}) async { + if (!_isRecording) return; _isRecording = false; // Cancel the stream subscription. @@ -177,6 +174,11 @@ class PreConnectAudioBuffer { _rendererId = null; + // Stop native audio when errored + if (withError != null) { + await webrtc.NativeAudioManagement.stopLocalRecording(); + } + // Complete agent ready future if not already completed _agentReadyManager.complete(); From 6d9192383d6bdd83183646a2a2d63d884a9eb406 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Thu, 11 Sep 2025 11:53:56 +0800 Subject: [PATCH 36/37] Completer manager tests --- lib/src/support/completer_manager.dart | 13 +- test/support/completer_manager_test.dart | 406 +++++++++++++++++++++++ 2 files changed, 418 insertions(+), 1 deletion(-) create mode 100644 test/support/completer_manager_test.dart diff --git a/lib/src/support/completer_manager.dart b/lib/src/support/completer_manager.dart index 5b3b39b6c..8900f177e 100644 --- a/lib/src/support/completer_manager.dart +++ b/lib/src/support/completer_manager.dart @@ -1,5 +1,16 @@ // Copyright 2025 LiveKit, Inc. -// A reusable completer manager that handles safe completion and lifecycle management. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. import 'dart:async'; diff --git a/test/support/completer_manager_test.dart b/test/support/completer_manager_test.dart new file mode 100644 index 000000000..540f66655 --- /dev/null +++ b/test/support/completer_manager_test.dart @@ -0,0 +1,406 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:livekit_client/src/support/completer_manager.dart'; + +void main() { + group('CompleterManager', () { + late CompleterManager manager; + + setUp(() { + manager = CompleterManager(); + }); + + tearDown(() { + // Only dispose if not already completed or disposed + try { + if (manager.isActive) { + manager.complete('teardown'); + } + manager.dispose(); + } catch (_) { + // Already disposed, ignore + } + }); + + group('Basic Functionality', () { + test('should provide a future when accessed', () async { + final future = manager.future; + expect(future, isA>()); + expect(manager.isActive, isTrue); + expect(manager.isCompleted, isFalse); + + // Complete it to avoid tearDown issues + manager.complete('test'); + await expectLater(future, completion('test')); + }); + + test('should complete successfully with value', () async { + final future = manager.future; + final result = manager.complete('success'); + + expect(result, isTrue); + expect(manager.isCompleted, isTrue); + expect(manager.isActive, isFalse); + await expectLater(future, completion('success')); + }); + + test('should complete successfully without value', () async { + final manager = CompleterManager(); + final future = manager.future; + final result = manager.complete(); + + expect(result, isTrue); + expect(manager.isCompleted, isTrue); + await expectLater(future, completion(isNull)); + manager.dispose(); + }); + + test('should complete with error', () async { + final future = manager.future; + final testError = Exception('test error'); + final result = manager.completeError(testError); + + expect(result, isTrue); + expect(manager.isCompleted, isTrue); + expect(manager.isActive, isFalse); + await expectLater(future, throwsA(testError)); + }); + + test('should complete with error and stack trace', () async { + final future = manager.future; + final testError = Exception('test error'); + final stackTrace = StackTrace.current; + final result = manager.completeError(testError, stackTrace); + + expect(result, isTrue); + expect(manager.isCompleted, isTrue); + + try { + await future; + fail('Should have thrown an error'); + } catch (error, trace) { + expect(error, equals(testError)); + expect(trace, equals(stackTrace)); + } + }); + + test('should return false when completing already completed manager', () { + manager.complete('first'); + final result1 = manager.complete('second'); + final result2 = manager.completeError(Exception('error')); + + expect(result1, isFalse); + expect(result2, isFalse); + }); + }); + + group('State Properties', () { + test('initial state should be inactive and not completed', () { + expect(manager.isActive, isFalse); + expect(manager.isCompleted, isFalse); + }); + + test('should be active after accessing future', () async { + final future = manager.future; + expect(manager.isActive, isTrue); + expect(manager.isCompleted, isFalse); + + // Complete it to avoid tearDown issues + manager.complete('test'); + await expectLater(future, completion('test')); + }); + + test('should be completed after completion', () async { + final future = manager.future; + manager.complete('done'); + + expect(manager.isActive, isFalse); + expect(manager.isCompleted, isTrue); + await expectLater(future, completion('done')); + }); + + test('should be completed after error completion', () async { + final future = manager.future; + final testError = Exception('error'); + manager.completeError(testError); + + expect(manager.isActive, isFalse); + expect(manager.isCompleted, isTrue); + await expectLater(future, throwsA(testError)); + }); + }); + + group('Reusability', () { + test('should create new future after previous completion', () async { + // First use + final future1 = manager.future; + manager.complete('first'); + await expectLater(future1, completion('first')); + + // Second use - should get new future + final future2 = manager.future; + expect(future2, isNot(same(future1))); + expect(manager.isActive, isTrue); + expect(manager.isCompleted, isFalse); + + manager.complete('second'); + await expectLater(future2, completion('second')); + }); + + test('should reset and be reusable', () async { + // First use + final future1 = manager.future; + manager.complete('first'); + await expectLater(future1, completion('first')); + + // Reset - note that reset creates a new completer, so it's not active until future is accessed + manager.reset(); + expect(manager.isCompleted, isFalse); + // After reset, manager is ready but not active until future is accessed + + // Second use after reset + final future2 = manager.future; + expect(manager.isActive, isTrue); + manager.complete('second'); + await expectLater(future2, completion('second')); + }); + + test('should reset even when active', () async { + final future1 = manager.future; + expect(manager.isActive, isTrue); + + manager.reset(); + expect(manager.isCompleted, isFalse); + // After reset, manager is ready but not active until future is accessed + + final future2 = manager.future; + expect(manager.isActive, isTrue); + expect(future2, isNot(same(future1))); + + // Complete it to avoid tearDown issues + manager.complete('test'); + await expectLater(future2, completion('test')); + }); + }); + + group('Timeout Functionality', () { + test('should timeout with default message', () async { + final future = manager.future; + manager.setTimer(Duration(milliseconds: 10)); + + await expectLater( + future, + throwsA(isA()), + ); + expect(manager.isCompleted, isTrue); + }); + + test('should timeout with custom message', () async { + final future = manager.future; + const customMessage = 'Custom timeout message'; + manager.setTimer(Duration(milliseconds: 10), timeoutReason: customMessage); + + try { + await future; + fail('Should have thrown TimeoutException'); + } catch (error) { + expect(error, isA()); + expect((error as TimeoutException).message, contains(customMessage)); + } + }); + + test('should cancel timeout on manual completion', () async { + final future = manager.future; + manager.setTimer(Duration(milliseconds: 100)); + + // Complete before timeout + manager.complete('completed'); + await expectLater(future, completion('completed')); + + // Wait longer than timeout to ensure it was cancelled + await Future.delayed(Duration(milliseconds: 150)); + // If we get here without additional errors, timeout was cancelled + }); + + test('should cancel timeout on error completion', () async { + final future = manager.future; + manager.setTimer(Duration(milliseconds: 100)); + + // Complete with error before timeout + final testError = Exception('test error'); + manager.completeError(testError); + await expectLater(future, throwsA(testError)); + + // Wait longer than timeout to ensure it was cancelled + await Future.delayed(Duration(milliseconds: 150)); + // If we get here without additional errors, timeout was cancelled + }); + + test('should replace previous timeout when setting new one', () async { + final future = manager.future; + manager.setTimer(Duration(milliseconds: 200)); + manager.setTimer(Duration(milliseconds: 10)); // This should replace the previous one + + await expectLater( + future, + throwsA(isA()), + ); + }); + + test('should not set timeout on completed manager', () async { + final future = manager.future; + manager.complete('done'); + await expectLater(future, completion('done')); + + // This should not throw or affect anything + manager.setTimer(Duration(milliseconds: 10)); + + // Verify still completed + expect(manager.isCompleted, isTrue); + }); + + test('should not set timeout when no active completer', () { + // Should not throw + manager.setTimer(Duration(milliseconds: 10)); + expect(manager.isActive, isFalse); + }); + }); + + group('Disposal', () { + test('should complete with error when disposed while active', () async { + final future = manager.future; + expect(manager.isActive, isTrue); + + manager.dispose(); + + await expectLater( + future, + throwsA(isA()), + ); + expect(manager.isCompleted, isTrue); + }); + + test('should not affect already completed manager', () async { + final future = manager.future; + manager.complete('done'); + await expectLater(future, completion('done')); + + // Dispose should not throw or change state + manager.dispose(); + expect(manager.isCompleted, isTrue); + }); + + test('should cancel timeout on dispose', () async { + final future = manager.future; + manager.setTimer(Duration(milliseconds: 10)); + + manager.dispose(); + + // Should complete with StateError, not TimeoutException + await expectLater( + future, + throwsA(isA()), + ); + }); + + test('should not allow operations after dispose', () { + manager.dispose(); + + final result1 = manager.complete('test'); + final result2 = manager.completeError(Exception('error')); + + expect(result1, isFalse); + expect(result2, isFalse); + expect(manager.isCompleted, isTrue); + }); + }); + + group('Edge Cases', () { + test('should handle multiple future accesses for same completer', () async { + final future1 = manager.future; + final future2 = manager.future; + + expect(identical(future1, future2), isTrue); + expect(manager.isActive, isTrue); + + // Complete it to avoid tearDown issues + manager.complete('test'); + await expectLater(future1, completion('test')); + }); + + test('should handle rapid complete/reset cycles', () async { + for (int i = 0; i < 5; i++) { + final future = manager.future; + manager.complete('value_$i'); + await expectLater(future, completion('value_$i')); + if (i < 4) { // Don't reset on the last iteration + manager.reset(); + } + } + }); + + test('should work with different generic types', () async { + final intManager = CompleterManager(); + final intFuture = intManager.future; + intManager.complete(42); + await expectLater(intFuture, completion(42)); + intManager.dispose(); + + final boolManager = CompleterManager(); + final boolFuture = boolManager.future; + boolManager.complete(true); + await expectLater(boolFuture, completion(isTrue)); + boolManager.dispose(); + }); + + test('should handle Future values in complete', () async { + final future = manager.future; + final futureValue = Future.value('async_result'); + manager.complete(futureValue); + + await expectLater(future, completion('async_result')); + }); + }); + + group('Thread Safety', () { + test('should handle concurrent operations safely', () async { + final futures = []; + + // Start multiple concurrent operations + for (int i = 0; i < 10; i++) { + futures.add(Future(() async { + final future = manager.future; + if (i == 0) { + // Only the first one should succeed in completing + await Future.delayed(Duration(milliseconds: 1)); + manager.complete('winner'); + } + return future; + })); + } + + final results = await Future.wait(futures, eagerError: false); + + // All should complete with the same value + for (final result in results) { + expect(result, equals('winner')); + } + }); + }); + }); +} From 403fa702176aa85abf3e66b1ca978ba4024884dd Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Wed, 1 Oct 2025 12:54:59 +0800 Subject: [PATCH 37/37] Update pubspec.lock --- pubspec.lock | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pubspec.lock b/pubspec.lock index 8567da2a1..ba94b337b 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -5,18 +5,18 @@ packages: dependency: transitive description: name: _fe_analyzer_shared - sha256: f0bb5d1648339c8308cc0b9838d8456b3cfe5c91f9dc1a735b4d003269e5da9a + sha256: dd3d2ad434b9510001d089e8de7556d50c834481b9abc2891a0184a8493a19dc url: "https://pub.dev" source: hosted - version: "88.0.0" + version: "89.0.0" analyzer: dependency: transitive description: name: analyzer - sha256: "0b7b9c329d2879f8f05d6c05b32ee9ec025f39b077864bdb5ac9a7b63418a98f" + sha256: c22b6e7726d1f9e5db58c7251606076a71ca0dbcf76116675edfadbec0c9e875 url: "https://pub.dev" source: hosted - version: "8.1.1" + version: "8.2.0" args: dependency: transitive description: @@ -654,5 +654,5 @@ packages: source: hosted version: "3.1.3" sdks: - dart: ">=3.8.0 <4.0.0" + dart: ">=3.9.0 <4.0.0" flutter: ">=3.29.0"