diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 085842f5..8475a900 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -1340,7 +1340,7 @@ extension DataStreamRoomMethods on Room { ? streamHeader.textHeader.generated : false, operationType: streamHeader.textHeader.hasOperationType() - ? streamHeader.textHeader.operationType.name + ? TextStreamOperationType.fromPBType(streamHeader.textHeader.operationType) : null, ); diff --git a/lib/src/participant/local.dart b/lib/src/participant/local.dart index 2dfcc3b8..e37c772b 100644 --- a/lib/src/participant/local.dart +++ b/lib/src/participant/local.dart @@ -1152,22 +1152,6 @@ extension RPCMethods on LocalParticipant { } } -/// Helper function to convert string operation type to enum -lk_models.DataStream_OperationType _stringToOperationType(String? type) { - switch (type?.toLowerCase()) { - case 'create': - return lk_models.DataStream_OperationType.CREATE; - case 'update': - return lk_models.DataStream_OperationType.UPDATE; - case 'delete': - return lk_models.DataStream_OperationType.DELETE; - case 'reaction': - return lk_models.DataStream_OperationType.REACTION; - default: - return lk_models.DataStream_OperationType.CREATE; - } -} - extension DataStreamParticipantMethods on LocalParticipant { Future sendText(String text, {SendTextOptions? options}) async { @@ -1259,7 +1243,7 @@ extension DataStreamParticipantMethods on LocalParticipant { attachedStreamIds: options?.attachedStreamIds, replyToStreamId: options?.replyToStreamId, generated: options?.generated ?? false, - operationType: _stringToOperationType(options?.type), + operationType: options?.type?.toPBType(), ), ); diff --git a/lib/src/types/data_stream.dart b/lib/src/types/data_stream.dart index 213760e9..6263697a 100644 --- a/lib/src/types/data_stream.dart +++ b/lib/src/types/data_stream.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'dart:io' show File; import '../data_stream/stream_reader.dart'; +import '../proto/livekit_models.pb.dart' as lk_models; import '../proto/livekit_models.pb.dart' show Encryption_Type, DataStream_Chunk; const kStreamChunkSize = 15_000; @@ -53,7 +54,7 @@ class StreamTextOptions { int? totalSize; /// 'create' | 'update' | 'delete' | 'reaction' - String? type; + TextStreamOperationType? type; /// true if the text has been generated by an agent from a participant's audio transcription bool generated; @@ -165,6 +166,43 @@ class ByteStreamInfo extends BaseStreamInfo { ); } +/// Operation types for text streams +enum TextStreamOperationType { + create, + update, + delete, + reaction; + + static TextStreamOperationType? fromPBType(lk_models.DataStream_OperationType? type) { + if (type == null) return TextStreamOperationType.create; + switch (type) { + case lk_models.DataStream_OperationType.CREATE: + return TextStreamOperationType.create; + case lk_models.DataStream_OperationType.UPDATE: + return TextStreamOperationType.update; + case lk_models.DataStream_OperationType.DELETE: + return TextStreamOperationType.delete; + case lk_models.DataStream_OperationType.REACTION: + return TextStreamOperationType.reaction; + default: + return null; + } + } + + lk_models.DataStream_OperationType toPBType() { + switch (this) { + case TextStreamOperationType.create: + return lk_models.DataStream_OperationType.CREATE; + case TextStreamOperationType.update: + return lk_models.DataStream_OperationType.UPDATE; + case TextStreamOperationType.delete: + return lk_models.DataStream_OperationType.DELETE; + case TextStreamOperationType.reaction: + return lk_models.DataStream_OperationType.REACTION; + } + } +} + class TextStreamInfo extends BaseStreamInfo { /// The stream ID this message is replying to, if any final String? replyToStreamId; @@ -179,7 +217,7 @@ class TextStreamInfo extends BaseStreamInfo { final bool generated; /// Operation type for the stream - final String? operationType; + final TextStreamOperationType? operationType; TextStreamInfo({ required String id, diff --git a/test/core/data_stream_test.dart b/test/core/data_stream_test.dart index 20c05f74..59f9ba6a 100644 --- a/test/core/data_stream_test.dart +++ b/test/core/data_stream_test.dart @@ -157,7 +157,12 @@ void main() { }); test('Text Stream With Operation Types', () async { - final operationTypes = ['create', 'update', 'delete', 'reaction']; + final operationTypes = [ + TextStreamOperationType.create, + TextStreamOperationType.update, + TextStreamOperationType.delete, + TextStreamOperationType.reaction, + ]; final receivedMessages = []; for (var operationType in operationTypes) { @@ -176,7 +181,7 @@ void main() { final stream = await room.localParticipant?.streamText(StreamTextOptions( topic: 'chat-operations', type: operationType, - version: operationType == 'update' ? 2 : null, + version: operationType == TextStreamOperationType.update ? 2 : null, )); await stream?.write('Streamed ${operationType}'); await stream?.close(); @@ -223,13 +228,13 @@ void main() { // Verify that reply metadata is accessible expect(reader.info?.replyToStreamId, originalStreamId); expect(reader.info?.version, 1); - expect(reader.info?.operationType, 'CREATE'); + expect(reader.info?.operationType, TextStreamOperationType.create); }); // Send a reply to an existing stream final stream = await room.localParticipant?.streamText(StreamTextOptions( topic: 'chat-replies', - type: 'create', + type: TextStreamOperationType.create, streamId: replyStreamId, replyToStreamId: originalStreamId, version: 1, @@ -388,7 +393,7 @@ void main() { final stream = await room.localParticipant?.streamText(StreamTextOptions( topic: 'concurrent-streams', streamId: 'stream-${i}', - type: 'create', + type: TextStreamOperationType.create, )); await stream?.write('Concurrent message ${i}'); await stream?.close(); @@ -454,7 +459,7 @@ void main() { // Send a message with comprehensive options final stream = await room.localParticipant?.streamText(StreamTextOptions( topic: 'header-validation', - type: 'create', + type: TextStreamOperationType.create, version: 1, generated: false, attributes: {