Skip to content

Commit

Permalink
[flatbuffers] Add pause() and resume() to DataProducer and DataConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
ibc committed Jun 21, 2023
1 parent ab1dfac commit bdee272
Show file tree
Hide file tree
Showing 13 changed files with 321 additions and 8 deletions.
4 changes: 3 additions & 1 deletion worker/fbs/notification.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ enum Event: uint8 {
// Notifications to worker.
TRANSPORT_SEND_RTCP = 0,
PRODUCER_SEND,
DATA_PRODUCER_SEND,
DATAPRODUCER_SEND,
// Notifications from worker.
WORKER_RUNNING,
TRANSPORT_SCTP_STATE_CHANGE,
Expand All @@ -37,6 +37,8 @@ enum Event: uint8 {
CONSUMER_TRACE,
DATACONSUMER_BUFFERED_AMOUNT_LOW,
DATACONSUMER_SCTP_SENDBUFFER_FULL,
DATACONSUMER_DATAPRODUCER_PAUSE,
DATACONSUMER_DATAPRODUCER_RESUME,
DATACONSUMER_DATAPRODUCER_CLOSE,
DATACONSUMER_MESSAGE,
ACTIVESPEAKEROBSERVER_DOMINANT_SPEAKER,
Expand Down
4 changes: 4 additions & 0 deletions worker/fbs/request.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@ enum Method: uint8 {
CONSUMER_ENABLE_TRACE_EVENT,
DATA_PRODUCER_DUMP,
DATA_PRODUCER_GET_STATS,
DATA_PRODUCER_PAUSE,
DATA_PRODUCER_RESUME,
DATA_CONSUMER_DUMP,
DATA_CONSUMER_GET_STATS,
DATA_CONSUMER_PAUSE,
DATA_CONSUMER_RESUME,
DATA_CONSUMER_GET_BUFFERED_AMOUNT,
DATA_CONSUMER_SET_BUFFERED_AMOUNT_LOW_THRESHOLD,
DATA_CONSUMER_SEND,
Expand Down
12 changes: 12 additions & 0 deletions worker/include/RTC/DataConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ namespace RTC
}
void TransportConnected();
void TransportDisconnected();
bool IsPaused() const
{
return this->paused;
}
bool IsDataProducerPaused() const
{
return this->dataProducerPaused;
}
void DataProducerPaused();
void DataProducerResumed();
void SctpAssociationConnected();
void SctpAssociationClosed();
void SctpAssociationBufferedAmount(uint32_t bufferedAmount);
Expand Down Expand Up @@ -108,6 +118,8 @@ namespace RTC
std::string protocol;
bool transportConnected{ false };
bool sctpAssociationConnected{ false };
bool paused{ false };
bool dataProducerPaused{ false };
bool dataProducerClosed{ false };
size_t messagesSent{ 0u };
size_t bytesSent{ 0u };
Expand Down
7 changes: 7 additions & 0 deletions worker/include/RTC/DataProducer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace RTC
virtual void OnDataProducerReceiveData(RTC::DataProducer* producer, size_t len) = 0;
virtual void OnDataProducerMessageReceived(
RTC::DataProducer* dataProducer, uint32_t ppid, const uint8_t* msg, size_t len) = 0;
virtual void OnDataProducerPaused(RTC::DataProducer* dataProducer) = 0;
virtual void OnDataProducerResumed(RTC::DataProducer* dataProducer) = 0;
};

public:
Expand Down Expand Up @@ -55,6 +57,10 @@ namespace RTC
{
return this->sctpStreamParameters;
}
bool IsPaused() const
{
return this->paused;
}
void ReceiveMessage(uint32_t ppid, const uint8_t* msg, size_t len);

/* Methods inherited from Channel::ChannelSocket::RequestHandler. */
Expand All @@ -80,6 +86,7 @@ namespace RTC
RTC::SctpStreamParameters sctpStreamParameters;
std::string label;
std::string protocol;
bool paused{ false };
size_t messagesReceived{ 0u };
size_t bytesReceived{ 0u };
};
Expand Down
2 changes: 2 additions & 0 deletions worker/include/RTC/Router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ namespace RTC
RTC::Transport* transport, RTC::Consumer* consumer, uint32_t mappedSsrc) override;
void OnTransportNewDataProducer(RTC::Transport* transport, RTC::DataProducer* dataProducer) override;
void OnTransportDataProducerClosed(RTC::Transport* transport, RTC::DataProducer* dataProducer) override;
void OnTransportDataProducerPaused(RTC::Transport* transport, RTC::DataProducer* dataProducer) override;
void OnTransportDataProducerResumed(RTC::Transport* transport, RTC::DataProducer* dataProducer) override;
void OnTransportDataProducerMessageReceived(
RTC::Transport* transport,
RTC::DataProducer* dataProducer,
Expand Down
5 changes: 5 additions & 0 deletions worker/include/RTC/Transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ namespace RTC
virtual void OnTransportConsumerClosed(RTC::Transport* transport, RTC::Consumer* consumer) = 0;
virtual void OnTransportConsumerProducerClosed(
RTC::Transport* transport, RTC::Consumer* consumer) = 0;
virtual void OnTransportDataProducerPaused(RTC::Transport* transport, RTC::DataProducer* dataProducer) = 0;
virtual void OnTransportDataProducerResumed(RTC::Transport* transport, RTC::DataProducer* dataProducer) = 0;
virtual void OnTransportConsumerKeyFrameRequested(
RTC::Transport* transport, RTC::Consumer* consumer, uint32_t mappedSsrc) = 0;
virtual void OnTransportNewDataProducer(
Expand Down Expand Up @@ -235,6 +237,9 @@ namespace RTC
}
void OnDataProducerMessageReceived(
RTC::DataProducer* dataProducer, uint32_t ppid, const uint8_t* msg, size_t len) override;
void OnDataProducerPaused(RTC::DataProducer* dataProducer) override;
void OnDataProducerResumed(RTC::DataProducer* dataProducer) override;


/* Pure virtual methods inherited from RTC::DataConsumer::Listener. */
public:
Expand Down
4 changes: 4 additions & 0 deletions worker/src/Channel/ChannelRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ namespace Channel
{ FBS::Request::Method::CONSUMER_ENABLE_TRACE_EVENT, "consumer.enableTraceEvent" },
{ FBS::Request::Method::DATA_PRODUCER_DUMP, "dataProducer.dump" },
{ FBS::Request::Method::DATA_PRODUCER_GET_STATS, "dataProducer.getStats" },
{ FBS::Request::Method::DATA_PRODUCER_PAUSE, "dataProducer.pause" },
{ FBS::Request::Method::DATA_PRODUCER_RESUME, "dataProducer.resume" },
{ FBS::Request::Method::DATA_CONSUMER_DUMP, "dataConsumer.dump" },
{ FBS::Request::Method::DATA_CONSUMER_GET_STATS, "dataConsumer.getStats" },
{ FBS::Request::Method::DATA_CONSUMER_PAUSE, "dataConsumer.pause" },
{ FBS::Request::Method::DATA_CONSUMER_RESUME, "dataConsumer.resume" },
{ FBS::Request::Method::DATA_CONSUMER_GET_BUFFERED_AMOUNT, "dataConsumer.getBufferedAmount" },
{ FBS::Request::Method::DATA_CONSUMER_SET_BUFFERED_AMOUNT_LOW_THRESHOLD, "dataConsumer.setBufferedAmountLowThreshold" },
{ FBS::Request::Method::DATA_CONSUMER_SEND, "dataConsumer.send" },
Expand Down
Loading

0 comments on commit bdee272

Please sign in to comment.