From b361085a3108ec50c27d6c89d8689a2508704382 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 23 Sep 2025 04:15:31 +0800 Subject: [PATCH 1/2] Implementation --- lib/src/core/room.dart | 13 +++++++++++++ lib/src/participant/local.dart | 5 +++++ lib/src/types/data_stream.dart | 20 ++++++++++++++++++++ test/core/data_stream_test.dart | 5 +++++ 4 files changed, 43 insertions(+) diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index ad092c03..085842f5 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -1329,6 +1329,19 @@ extension DataStreamRoomMethods on Room { topic: streamHeader.topic, timestamp: streamHeader.timestamp.toInt(), attributes: streamHeader.attributes, + replyToStreamId: streamHeader.textHeader.hasReplyToStreamId() + ? streamHeader.textHeader.replyToStreamId + : null, + attachedStreamIds: streamHeader.textHeader.attachedStreamIds.toList(), + version: streamHeader.textHeader.hasVersion() + ? streamHeader.textHeader.version + : null, + generated: streamHeader.textHeader.hasGenerated() + ? streamHeader.textHeader.generated + : false, + operationType: streamHeader.textHeader.hasOperationType() + ? streamHeader.textHeader.operationType.name + : null, ); final streamController = DataStreamController( diff --git a/lib/src/participant/local.dart b/lib/src/participant/local.dart index 9c4f8813..0b9934d7 100644 --- a/lib/src/participant/local.dart +++ b/lib/src/participant/local.dart @@ -1239,6 +1239,11 @@ extension DataStreamParticipantMethods on LocalParticipant { timestamp: DateTime.timestamp().millisecondsSinceEpoch, topic: options?.topic ?? '', size: options?.totalSize ?? 0, + replyToStreamId: options?.replyToStreamId, + attachedStreamIds: options?.attachedStreamIds ?? [], + version: options?.version, + generated: options?.generated ?? false, + operationType: options?.type, ); final header = lk_models.DataStream_Header( diff --git a/lib/src/types/data_stream.dart b/lib/src/types/data_stream.dart index 40676fda..213760e9 100644 --- a/lib/src/types/data_stream.dart +++ b/lib/src/types/data_stream.dart @@ -166,6 +166,21 @@ class ByteStreamInfo extends BaseStreamInfo { } class TextStreamInfo extends BaseStreamInfo { + /// The stream ID this message is replying to, if any + final String? replyToStreamId; + + /// List of stream IDs that are attached to this stream + final List attachedStreamIds; + + /// Version of the stream + final int? version; + + /// Whether this text was generated by an agent + final bool generated; + + /// Operation type for the stream + final String? operationType; + TextStreamInfo({ required String id, required String mimeType, @@ -173,6 +188,11 @@ class TextStreamInfo extends BaseStreamInfo { required int timestamp, required int size, Map attributes = const {}, + this.replyToStreamId, + this.attachedStreamIds = const [], + this.version, + this.generated = false, + this.operationType, }) : super( id: id, mimeType: mimeType, diff --git a/test/core/data_stream_test.dart b/test/core/data_stream_test.dart index 15246576..20c05f74 100644 --- a/test/core/data_stream_test.dart +++ b/test/core/data_stream_test.dart @@ -219,6 +219,11 @@ void main() { final text = await reader.readAll(); print('received reply message: ${text}'); expect(text, 'This is a reply to the original message'); + + // Verify that reply metadata is accessible + expect(reader.info?.replyToStreamId, originalStreamId); + expect(reader.info?.version, 1); + expect(reader.info?.operationType, 'CREATE'); }); // Send a reply to an existing stream From eb2e2b712d1ba4acfd2d72762ff5ad891b8ffbd1 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 23 Sep 2025 04:53:47 +0800 Subject: [PATCH 2/2] Enum for TextStreamOperationType --- lib/src/core/room.dart | 2 +- lib/src/participant/local.dart | 18 +------------- lib/src/types/data_stream.dart | 42 +++++++++++++++++++++++++++++++-- test/core/data_stream_test.dart | 17 ++++++++----- 4 files changed, 53 insertions(+), 26 deletions(-) diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 085842f5..8475a900 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -1340,7 +1340,7 @@ extension DataStreamRoomMethods on Room { ? streamHeader.textHeader.generated : false, operationType: streamHeader.textHeader.hasOperationType() - ? streamHeader.textHeader.operationType.name + ? TextStreamOperationType.fromPBType(streamHeader.textHeader.operationType) : null, ); diff --git a/lib/src/participant/local.dart b/lib/src/participant/local.dart index 0b9934d7..32201ef1 100644 --- a/lib/src/participant/local.dart +++ b/lib/src/participant/local.dart @@ -1152,22 +1152,6 @@ extension RPCMethods on LocalParticipant { } } -/// Helper function to convert string operation type to enum -lk_models.DataStream_OperationType _stringToOperationType(String? type) { - switch (type?.toLowerCase()) { - case 'create': - return lk_models.DataStream_OperationType.CREATE; - case 'update': - return lk_models.DataStream_OperationType.UPDATE; - case 'delete': - return lk_models.DataStream_OperationType.DELETE; - case 'reaction': - return lk_models.DataStream_OperationType.REACTION; - default: - return lk_models.DataStream_OperationType.CREATE; - } -} - extension DataStreamParticipantMethods on LocalParticipant { Future sendText(String text, {SendTextOptions? options}) async { @@ -1258,7 +1242,7 @@ extension DataStreamParticipantMethods on LocalParticipant { attachedStreamIds: options?.attachedStreamIds, replyToStreamId: options?.replyToStreamId, generated: options?.generated ?? false, - operationType: _stringToOperationType(options?.type), + operationType: options?.type?.toPBType(), ), ); final destinationIdentities = options?.destinationIdentities; diff --git a/lib/src/types/data_stream.dart b/lib/src/types/data_stream.dart index 213760e9..6263697a 100644 --- a/lib/src/types/data_stream.dart +++ b/lib/src/types/data_stream.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'dart:io' show File; import '../data_stream/stream_reader.dart'; +import '../proto/livekit_models.pb.dart' as lk_models; import '../proto/livekit_models.pb.dart' show Encryption_Type, DataStream_Chunk; const kStreamChunkSize = 15_000; @@ -53,7 +54,7 @@ class StreamTextOptions { int? totalSize; /// 'create' | 'update' | 'delete' | 'reaction' - String? type; + TextStreamOperationType? type; /// true if the text has been generated by an agent from a participant's audio transcription bool generated; @@ -165,6 +166,43 @@ class ByteStreamInfo extends BaseStreamInfo { ); } +/// Operation types for text streams +enum TextStreamOperationType { + create, + update, + delete, + reaction; + + static TextStreamOperationType? fromPBType(lk_models.DataStream_OperationType? type) { + if (type == null) return TextStreamOperationType.create; + switch (type) { + case lk_models.DataStream_OperationType.CREATE: + return TextStreamOperationType.create; + case lk_models.DataStream_OperationType.UPDATE: + return TextStreamOperationType.update; + case lk_models.DataStream_OperationType.DELETE: + return TextStreamOperationType.delete; + case lk_models.DataStream_OperationType.REACTION: + return TextStreamOperationType.reaction; + default: + return null; + } + } + + lk_models.DataStream_OperationType toPBType() { + switch (this) { + case TextStreamOperationType.create: + return lk_models.DataStream_OperationType.CREATE; + case TextStreamOperationType.update: + return lk_models.DataStream_OperationType.UPDATE; + case TextStreamOperationType.delete: + return lk_models.DataStream_OperationType.DELETE; + case TextStreamOperationType.reaction: + return lk_models.DataStream_OperationType.REACTION; + } + } +} + class TextStreamInfo extends BaseStreamInfo { /// The stream ID this message is replying to, if any final String? replyToStreamId; @@ -179,7 +217,7 @@ class TextStreamInfo extends BaseStreamInfo { final bool generated; /// Operation type for the stream - final String? operationType; + final TextStreamOperationType? operationType; TextStreamInfo({ required String id, diff --git a/test/core/data_stream_test.dart b/test/core/data_stream_test.dart index 20c05f74..59f9ba6a 100644 --- a/test/core/data_stream_test.dart +++ b/test/core/data_stream_test.dart @@ -157,7 +157,12 @@ void main() { }); test('Text Stream With Operation Types', () async { - final operationTypes = ['create', 'update', 'delete', 'reaction']; + final operationTypes = [ + TextStreamOperationType.create, + TextStreamOperationType.update, + TextStreamOperationType.delete, + TextStreamOperationType.reaction, + ]; final receivedMessages = []; for (var operationType in operationTypes) { @@ -176,7 +181,7 @@ void main() { final stream = await room.localParticipant?.streamText(StreamTextOptions( topic: 'chat-operations', type: operationType, - version: operationType == 'update' ? 2 : null, + version: operationType == TextStreamOperationType.update ? 2 : null, )); await stream?.write('Streamed ${operationType}'); await stream?.close(); @@ -223,13 +228,13 @@ void main() { // Verify that reply metadata is accessible expect(reader.info?.replyToStreamId, originalStreamId); expect(reader.info?.version, 1); - expect(reader.info?.operationType, 'CREATE'); + expect(reader.info?.operationType, TextStreamOperationType.create); }); // Send a reply to an existing stream final stream = await room.localParticipant?.streamText(StreamTextOptions( topic: 'chat-replies', - type: 'create', + type: TextStreamOperationType.create, streamId: replyStreamId, replyToStreamId: originalStreamId, version: 1, @@ -388,7 +393,7 @@ void main() { final stream = await room.localParticipant?.streamText(StreamTextOptions( topic: 'concurrent-streams', streamId: 'stream-${i}', - type: 'create', + type: TextStreamOperationType.create, )); await stream?.write('Concurrent message ${i}'); await stream?.close(); @@ -454,7 +459,7 @@ void main() { // Send a message with comprehensive options final stream = await room.localParticipant?.streamText(StreamTextOptions( topic: 'header-validation', - type: 'create', + type: TextStreamOperationType.create, version: 1, generated: false, attributes: {