Skip to content

Commit

Permalink
Add handlers for start/pause publication messages
Browse files Browse the repository at this point in the history
  • Loading branch information
yuting-fan committed Jan 11, 2022
1 parent 43efe53 commit b68c608
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/datachannel/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ func (dataChannel *DataChannel) OutputMessageHandler(log log.T, stopHandler Stop
return dataChannel.HandleAcknowledgeMessage(log, *outputMessage)
case message.ChannelClosedMessage:
dataChannel.HandleChannelClosedMessage(log, stopHandler, sessionID, *outputMessage)
case message.StartPublicationMessage, message.PausePublicationMessage:
return nil
default:
log.Warn("Invalid message type received: %s", outputMessage.MessageType)
}
Expand Down
30 changes: 30 additions & 0 deletions src/datachannel/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,36 @@ func TestDataChannelIncomingMessageHandlerForAcknowledgeMessage(t *testing.T) {
assert.Equal(t, 3, dataChannel.OutgoingMessageBuffer.Messages.Len())
}

func TestDataChannelIncomingMessageHandlerForPausePublicationessage(t *testing.T) {
dataChannel := getDataChannel()
mockChannel := &communicatorMocks.IWebSocketChannel{}
dataChannel.wsChannel = mockChannel

size := 5
streamingMessages = make([]StreamingMessage, size)
serializedClientMessage := make([][]byte, size)
for i := 0; i < size; i++ {
clientMessage := getClientMessage(int64(i), message.PausePublicationMessage, uint32(message.Output), []byte(""))
serializedClientMessage[i], _ = clientMessage.SerializeClientMessage(mockLogger)
streamingMessages[i] = StreamingMessage{
serializedClientMessage[i],
int64(i),
time.Now(),
new(int),
}
}

var handler OutputStreamDataMessageHandler = func(log log.T, outputMessage message.ClientMessage) (bool, error) {
return true, nil
}

var stopHandler Stop

dataChannel.RegisterOutputStreamHandler(handler, true)
err := dataChannel.OutputMessageHandler(logger, stopHandler, sessionId, serializedClientMessages[0])
assert.Nil(t, err)
}

func TestHandshakeRequestHandler(t *testing.T) {
dataChannel := getDataChannel()
mockChannel := &communicatorMocks.IWebSocketChannel{}
Expand Down
7 changes: 7 additions & 0 deletions src/message/clientmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ const (

// ChannelClosedMessage represents message type for ChannelClosed
ChannelClosedMessage = "channel_closed"

// StartPublicationMessage represents the message type that notifies the CLI to start sending stream messages
StartPublicationMessage = "start_publication"

// PausePublicationMessage represents the message type that notifies the CLI to pause sending stream messages
// as the remote data channel is inactive
PausePublicationMessage = "pause_publication"
)

// AcknowledgeContent is used to inform the sender of an acknowledge message that the message has been received.
Expand Down
4 changes: 4 additions & 0 deletions src/message/messageparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ func getBytes(log log.T, byteArray []byte, offset int, byteLength int) (result [

// Validate returns error if the message is invalid
func (clientMessage *ClientMessage) Validate() error {
if StartPublicationMessage == clientMessage.MessageType ||
PausePublicationMessage == clientMessage.MessageType {
return nil
}
if clientMessage.HeaderLength == 0 {
return errors.New("HeaderLength cannot be zero")
}
Expand Down
17 changes: 17 additions & 0 deletions src/message/messageparser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,23 @@ func TestClientMessage_Validate(t *testing.T) {
assert.NoError(t, err, "An error was thrown when none was expected.")
}

func TestClientMessage_ValidateStartPublicationMessage(t *testing.T) {
u, _ := uuid.Parse(messageId)

clientMessage := ClientMessage{
SchemaVersion: schemaVersion,
SequenceNumber: 1,
Flags: 2,
MessageId: u,
Payload: payload,
PayloadLength: 3,
MessageType: StartPublicationMessage,
}

err := clientMessage.Validate()
assert.NoError(t, err, "Validating StartPublicationMessage should not throw an error")
}

func TestClientMessage_DeserializeDataStreamAcknowledgeContent(t *testing.T) {
t.Logf("Starting test: %s", t.Name())
// ClientMessage is initialized with improperly formatted json data
Expand Down

0 comments on commit b68c608

Please sign in to comment.