Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/src/core/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1340,7 +1340,7 @@ extension DataStreamRoomMethods on Room {
? streamHeader.textHeader.generated
: false,
operationType: streamHeader.textHeader.hasOperationType()
? streamHeader.textHeader.operationType.name
? TextStreamOperationType.fromPBType(streamHeader.textHeader.operationType)
: null,
);

Expand Down
18 changes: 1 addition & 17 deletions lib/src/participant/local.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1152,22 +1152,6 @@ extension RPCMethods on LocalParticipant {
}
}

/// Helper function to convert string operation type to enum
lk_models.DataStream_OperationType _stringToOperationType(String? type) {
switch (type?.toLowerCase()) {
case 'create':
return lk_models.DataStream_OperationType.CREATE;
case 'update':
return lk_models.DataStream_OperationType.UPDATE;
case 'delete':
return lk_models.DataStream_OperationType.DELETE;
case 'reaction':
return lk_models.DataStream_OperationType.REACTION;
default:
return lk_models.DataStream_OperationType.CREATE;
}
}

extension DataStreamParticipantMethods on LocalParticipant {
Future<TextStreamInfo> sendText(String text,
{SendTextOptions? options}) async {
Expand Down Expand Up @@ -1259,7 +1243,7 @@ extension DataStreamParticipantMethods on LocalParticipant {
attachedStreamIds: options?.attachedStreamIds,
replyToStreamId: options?.replyToStreamId,
generated: options?.generated ?? false,
operationType: _stringToOperationType(options?.type),
operationType: options?.type?.toPBType(),
),
);

Expand Down
42 changes: 40 additions & 2 deletions lib/src/types/data_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:async';
import 'dart:io' show File;

import '../data_stream/stream_reader.dart';
import '../proto/livekit_models.pb.dart' as lk_models;
import '../proto/livekit_models.pb.dart' show Encryption_Type, DataStream_Chunk;

const kStreamChunkSize = 15_000;
Expand Down Expand Up @@ -53,7 +54,7 @@ class StreamTextOptions {
int? totalSize;

/// 'create' | 'update' | 'delete' | 'reaction'
String? type;
TextStreamOperationType? type;

/// true if the text has been generated by an agent from a participant's audio transcription
bool generated;
Expand Down Expand Up @@ -165,6 +166,43 @@ class ByteStreamInfo extends BaseStreamInfo {
);
}

/// Operation types for text streams
enum TextStreamOperationType {
create,
update,
delete,
reaction;

static TextStreamOperationType? fromPBType(lk_models.DataStream_OperationType? type) {
if (type == null) return TextStreamOperationType.create;
switch (type) {
case lk_models.DataStream_OperationType.CREATE:
return TextStreamOperationType.create;
case lk_models.DataStream_OperationType.UPDATE:
return TextStreamOperationType.update;
case lk_models.DataStream_OperationType.DELETE:
return TextStreamOperationType.delete;
case lk_models.DataStream_OperationType.REACTION:
return TextStreamOperationType.reaction;
default:
return null;
}
}

lk_models.DataStream_OperationType toPBType() {
switch (this) {
case TextStreamOperationType.create:
return lk_models.DataStream_OperationType.CREATE;
case TextStreamOperationType.update:
return lk_models.DataStream_OperationType.UPDATE;
case TextStreamOperationType.delete:
return lk_models.DataStream_OperationType.DELETE;
case TextStreamOperationType.reaction:
return lk_models.DataStream_OperationType.REACTION;
}
}
}

class TextStreamInfo extends BaseStreamInfo {
/// The stream ID this message is replying to, if any
final String? replyToStreamId;
Expand All @@ -179,7 +217,7 @@ class TextStreamInfo extends BaseStreamInfo {
final bool generated;

/// Operation type for the stream
final String? operationType;
final TextStreamOperationType? operationType;

TextStreamInfo({
required String id,
Expand Down
17 changes: 11 additions & 6 deletions test/core/data_stream_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ void main() {
});

test('Text Stream With Operation Types', () async {
final operationTypes = ['create', 'update', 'delete', 'reaction'];
final operationTypes = [
TextStreamOperationType.create,
TextStreamOperationType.update,
TextStreamOperationType.delete,
TextStreamOperationType.reaction,
];
final receivedMessages = <String>[];

for (var operationType in operationTypes) {
Expand All @@ -176,7 +181,7 @@ void main() {
final stream = await room.localParticipant?.streamText(StreamTextOptions(
topic: 'chat-operations',
type: operationType,
version: operationType == 'update' ? 2 : null,
version: operationType == TextStreamOperationType.update ? 2 : null,
));
await stream?.write('Streamed ${operationType}');
await stream?.close();
Expand Down Expand Up @@ -223,13 +228,13 @@ void main() {
// Verify that reply metadata is accessible
expect(reader.info?.replyToStreamId, originalStreamId);
expect(reader.info?.version, 1);
expect(reader.info?.operationType, 'CREATE');
expect(reader.info?.operationType, TextStreamOperationType.create);
});

// Send a reply to an existing stream
final stream = await room.localParticipant?.streamText(StreamTextOptions(
topic: 'chat-replies',
type: 'create',
type: TextStreamOperationType.create,
streamId: replyStreamId,
replyToStreamId: originalStreamId,
version: 1,
Expand Down Expand Up @@ -388,7 +393,7 @@ void main() {
final stream = await room.localParticipant?.streamText(StreamTextOptions(
topic: 'concurrent-streams',
streamId: 'stream-${i}',
type: 'create',
type: TextStreamOperationType.create,
));
await stream?.write('Concurrent message ${i}');
await stream?.close();
Expand Down Expand Up @@ -454,7 +459,7 @@ void main() {
// Send a message with comprehensive options
final stream = await room.localParticipant?.streamText(StreamTextOptions(
topic: 'header-validation',
type: 'create',
type: TextStreamOperationType.create,
version: 1,
generated: false,
attributes: {
Expand Down
Loading