diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index ad092c03..085842f5 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -1329,6 +1329,19 @@ extension DataStreamRoomMethods on Room { topic: streamHeader.topic, timestamp: streamHeader.timestamp.toInt(), attributes: streamHeader.attributes, + replyToStreamId: streamHeader.textHeader.hasReplyToStreamId() + ? streamHeader.textHeader.replyToStreamId + : null, + attachedStreamIds: streamHeader.textHeader.attachedStreamIds.toList(), + version: streamHeader.textHeader.hasVersion() + ? streamHeader.textHeader.version + : null, + generated: streamHeader.textHeader.hasGenerated() + ? streamHeader.textHeader.generated + : false, + operationType: streamHeader.textHeader.hasOperationType() + ? streamHeader.textHeader.operationType.name + : null, ); final streamController = DataStreamController( diff --git a/lib/src/participant/local.dart b/lib/src/participant/local.dart index 9c4f8813..2dfcc3b8 100644 --- a/lib/src/participant/local.dart +++ b/lib/src/participant/local.dart @@ -1232,20 +1232,26 @@ extension DataStreamParticipantMethods on LocalParticipant { Future streamText(StreamTextOptions? options) async { final streamId = options?.streamId ?? Uuid().v4(); + final timestamp = DateTime.timestamp().millisecondsSinceEpoch; final info = TextStreamInfo( id: streamId, mimeType: 'text/plain', - timestamp: DateTime.timestamp().millisecondsSinceEpoch, + timestamp: timestamp, topic: options?.topic ?? '', size: options?.totalSize ?? 0, + replyToStreamId: options?.replyToStreamId, + attachedStreamIds: options?.attachedStreamIds ?? [], + version: options?.version, + generated: options?.generated ?? false, + operationType: options?.type, ); final header = lk_models.DataStream_Header( streamId: streamId, mimeType: info.mimeType, topic: info.topic, - timestamp: Int64(info.timestamp), + timestamp: Int64(timestamp), totalLength: Int64(options?.totalSize ?? 0), attributes: options?.attributes.entries, textHeader: lk_models.DataStream_TextHeader( @@ -1256,11 +1262,13 @@ extension DataStreamParticipantMethods on LocalParticipant { operationType: _stringToOperationType(options?.type), ), ); + final destinationIdentities = options?.destinationIdentities; final packet = lk_models.DataPacket( destinationIdentities: destinationIdentities, streamHeader: header, ); + await room.engine.sendDataPacket(packet, reliability: true); final writableStream = WritableStream( @@ -1328,12 +1336,13 @@ extension DataStreamParticipantMethods on LocalParticipant { Future streamBytes(StreamBytesOptions? options) async { final streamId = options?.streamId ?? Uuid().v4(); + final timestamp = DateTime.timestamp().millisecondsSinceEpoch; final info = ByteStreamInfo( name: options?.name ?? 'unknown', id: streamId, mimeType: options?.mimeType ?? 'application/octet-stream', - timestamp: DateTime.timestamp().millisecondsSinceEpoch, + timestamp: timestamp, topic: options?.topic ?? '', size: options?.totalSize ?? 0, attributes: options?.attributes ?? {}, @@ -1345,7 +1354,7 @@ extension DataStreamParticipantMethods on LocalParticipant { streamId: streamId, topic: options?.topic, encryptionType: options?.encryptionType, - timestamp: Int64(info.timestamp), + timestamp: Int64(timestamp), byteHeader: lk_models.DataStream_ByteHeader( name: info.name, ), diff --git a/lib/src/types/data_stream.dart b/lib/src/types/data_stream.dart index 40676fda..213760e9 100644 --- a/lib/src/types/data_stream.dart +++ b/lib/src/types/data_stream.dart @@ -166,6 +166,21 @@ class ByteStreamInfo extends BaseStreamInfo { } class TextStreamInfo extends BaseStreamInfo { + /// The stream ID this message is replying to, if any + final String? replyToStreamId; + + /// List of stream IDs that are attached to this stream + final List attachedStreamIds; + + /// Version of the stream + final int? version; + + /// Whether this text was generated by an agent + final bool generated; + + /// Operation type for the stream + final String? operationType; + TextStreamInfo({ required String id, required String mimeType, @@ -173,6 +188,11 @@ class TextStreamInfo extends BaseStreamInfo { required int timestamp, required int size, Map attributes = const {}, + this.replyToStreamId, + this.attachedStreamIds = const [], + this.version, + this.generated = false, + this.operationType, }) : super( id: id, mimeType: mimeType, diff --git a/test/core/data_stream_test.dart b/test/core/data_stream_test.dart index 15246576..20c05f74 100644 --- a/test/core/data_stream_test.dart +++ b/test/core/data_stream_test.dart @@ -219,6 +219,11 @@ void main() { final text = await reader.readAll(); print('received reply message: ${text}'); expect(text, 'This is a reply to the original message'); + + // Verify that reply metadata is accessible + expect(reader.info?.replyToStreamId, originalStreamId); + expect(reader.info?.version, 1); + expect(reader.info?.operationType, 'CREATE'); }); // Send a reply to an existing stream