forked from open-webrtc-toolkit/owt-deps-webrtc
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor DataChannel control out of PeerConnection
This is step 1-3 of the refactoring process outlined in comment #1 of bugs.webrtc.org/11146 Bug: webrtc:11146 Change-Id: Iccad009bc0585f99d207a6ddb42fd8e71312fc0a Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/161003 Commit-Queue: Harald Alvestrand <hta@webrtc.org> Reviewed-by: Karl Wiberg <kwiberg@webrtc.org> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> Cr-Commit-Position: refs/heads/master@{#29970}
- Loading branch information
Harald Alvestrand
authored and
Commit Bot
committed
Dec 2, 2019
1 parent
b877e71
commit 00cf34c
Showing
4 changed files
with
538 additions
and
419 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,363 @@ | ||
/* | ||
* Copyright 2019 The WebRTC project authors. All Rights Reserved. | ||
* | ||
* Use of this source code is governed by a BSD-style license | ||
* that can be found in the LICENSE file in the root of the source | ||
* tree. An additional intellectual property rights grant can be found | ||
* in the file PATENTS. All contributing project authors may | ||
* be found in the AUTHORS file in the root of the source tree. | ||
*/ | ||
|
||
// This file contains the implementation of the class | ||
// webrtc::PeerConnection::DataChannelController. | ||
// | ||
// The intent is that this should be webrtc::DataChannelController, but | ||
// as a migration stage, it is simpler to have it as an inner class, | ||
// declared in the header file pc/peer_connection.h | ||
|
||
#include "pc/peer_connection.h" | ||
#include "pc/sctp_utils.h" | ||
|
||
namespace webrtc { | ||
|
||
bool PeerConnection::DataChannelController::SendData( | ||
const cricket::SendDataParams& params, | ||
const rtc::CopyOnWriteBuffer& payload, | ||
cricket::SendDataResult* result) { | ||
// RTC_DCHECK_RUN_ON(signaling_thread()); | ||
if (data_channel_transport()) { | ||
SendDataParams send_params; | ||
send_params.type = ToWebrtcDataMessageType(params.type); | ||
send_params.ordered = params.ordered; | ||
if (params.max_rtx_count >= 0) { | ||
send_params.max_rtx_count = params.max_rtx_count; | ||
} else if (params.max_rtx_ms >= 0) { | ||
send_params.max_rtx_ms = params.max_rtx_ms; | ||
} | ||
|
||
RTCError error = network_thread()->Invoke<RTCError>( | ||
RTC_FROM_HERE, [this, params, send_params, payload] { | ||
return data_channel_transport()->SendData(params.sid, send_params, | ||
payload); | ||
}); | ||
|
||
if (error.ok()) { | ||
*result = cricket::SendDataResult::SDR_SUCCESS; | ||
return true; | ||
} else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) { | ||
// SCTP transport uses RESOURCE_EXHAUSTED when it's blocked. | ||
// TODO(mellem): Stop using RTCError here and get rid of the mapping. | ||
*result = cricket::SendDataResult::SDR_BLOCK; | ||
return false; | ||
} | ||
*result = cricket::SendDataResult::SDR_ERROR; | ||
return false; | ||
} else if (rtp_data_channel()) { | ||
return rtp_data_channel()->SendData(params, payload, result); | ||
} | ||
RTC_LOG(LS_ERROR) << "SendData called before transport is ready"; | ||
return false; | ||
} | ||
|
||
bool PeerConnection::DataChannelController::ConnectDataChannel( | ||
DataChannel* webrtc_data_channel) { | ||
RTC_DCHECK_RUN_ON(signaling_thread()); | ||
if (!rtp_data_channel() && !data_channel_transport()) { | ||
// Don't log an error here, because DataChannels are expected to call | ||
// ConnectDataChannel in this state. It's the only way to initially tell | ||
// whether or not the underlying transport is ready. | ||
return false; | ||
} | ||
if (data_channel_transport()) { | ||
SignalDataChannelTransportWritable_s.connect(webrtc_data_channel, | ||
&DataChannel::OnChannelReady); | ||
SignalDataChannelTransportReceivedData_s.connect( | ||
webrtc_data_channel, &DataChannel::OnDataReceived); | ||
SignalDataChannelTransportChannelClosing_s.connect( | ||
webrtc_data_channel, &DataChannel::OnClosingProcedureStartedRemotely); | ||
SignalDataChannelTransportChannelClosed_s.connect( | ||
webrtc_data_channel, &DataChannel::OnClosingProcedureComplete); | ||
} | ||
if (rtp_data_channel()) { | ||
rtp_data_channel()->SignalReadyToSendData.connect( | ||
webrtc_data_channel, &DataChannel::OnChannelReady); | ||
rtp_data_channel()->SignalDataReceived.connect( | ||
webrtc_data_channel, &DataChannel::OnDataReceived); | ||
} | ||
return true; | ||
} | ||
|
||
void PeerConnection::DataChannelController::DisconnectDataChannel( | ||
DataChannel* webrtc_data_channel) { | ||
RTC_DCHECK_RUN_ON(signaling_thread()); | ||
if (!rtp_data_channel() && !data_channel_transport()) { | ||
RTC_LOG(LS_ERROR) | ||
<< "DisconnectDataChannel called when rtp_data_channel_ and " | ||
"sctp_transport_ are NULL."; | ||
return; | ||
} | ||
if (data_channel_transport()) { | ||
SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel); | ||
SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel); | ||
SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel); | ||
SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel); | ||
} | ||
if (rtp_data_channel()) { | ||
rtp_data_channel()->SignalReadyToSendData.disconnect(webrtc_data_channel); | ||
rtp_data_channel()->SignalDataReceived.disconnect(webrtc_data_channel); | ||
} | ||
} | ||
|
||
void PeerConnection::DataChannelController::AddSctpDataStream(int sid) { | ||
if (data_channel_transport()) { | ||
network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] { | ||
if (data_channel_transport()) { | ||
data_channel_transport()->OpenChannel(sid); | ||
} | ||
}); | ||
} | ||
} | ||
|
||
void PeerConnection::DataChannelController::RemoveSctpDataStream(int sid) { | ||
if (data_channel_transport()) { | ||
network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] { | ||
if (data_channel_transport()) { | ||
data_channel_transport()->CloseChannel(sid); | ||
} | ||
}); | ||
} | ||
} | ||
|
||
bool PeerConnection::DataChannelController::ReadyToSendData() const { | ||
RTC_DCHECK_RUN_ON(signaling_thread()); | ||
return (rtp_data_channel() && rtp_data_channel()->ready_to_send_data()) || | ||
(data_channel_transport() && data_channel_transport_ready_to_send_); | ||
} | ||
|
||
void PeerConnection::DataChannelController::OnDataReceived( | ||
int channel_id, | ||
DataMessageType type, | ||
const rtc::CopyOnWriteBuffer& buffer) { | ||
RTC_DCHECK_RUN_ON(network_thread()); | ||
cricket::ReceiveDataParams params; | ||
params.sid = channel_id; | ||
params.type = ToCricketDataMessageType(type); | ||
data_channel_transport_invoker_->AsyncInvoke<void>( | ||
RTC_FROM_HERE, signaling_thread(), [this, params, buffer] { | ||
RTC_DCHECK_RUN_ON(signaling_thread()); | ||
if (!HandleOpenMessage_s(params, buffer)) { | ||
SignalDataChannelTransportReceivedData_s(params, buffer); | ||
} | ||
}); | ||
} | ||
|
||
void PeerConnection::DataChannelController::OnChannelClosing(int channel_id) { | ||
RTC_DCHECK_RUN_ON(network_thread()); | ||
data_channel_transport_invoker_->AsyncInvoke<void>( | ||
RTC_FROM_HERE, signaling_thread(), [this, channel_id] { | ||
RTC_DCHECK_RUN_ON(signaling_thread()); | ||
SignalDataChannelTransportChannelClosing_s(channel_id); | ||
}); | ||
} | ||
|
||
void PeerConnection::DataChannelController::OnChannelClosed(int channel_id) { | ||
RTC_DCHECK_RUN_ON(network_thread()); | ||
data_channel_transport_invoker_->AsyncInvoke<void>( | ||
RTC_FROM_HERE, signaling_thread(), [this, channel_id] { | ||
RTC_DCHECK_RUN_ON(signaling_thread()); | ||
SignalDataChannelTransportChannelClosed_s(channel_id); | ||
}); | ||
} | ||
|
||
void PeerConnection::DataChannelController::OnReadyToSend() { | ||
RTC_DCHECK_RUN_ON(network_thread()); | ||
data_channel_transport_invoker_->AsyncInvoke<void>( | ||
RTC_FROM_HERE, signaling_thread(), [this] { | ||
RTC_DCHECK_RUN_ON(signaling_thread()); | ||
data_channel_transport_ready_to_send_ = true; | ||
SignalDataChannelTransportWritable_s( | ||
data_channel_transport_ready_to_send_); | ||
}); | ||
} | ||
|
||
void PeerConnection::DataChannelController::SetupDataChannelTransport_n() { | ||
RTC_DCHECK_RUN_ON(network_thread()); | ||
data_channel_transport_invoker_ = std::make_unique<rtc::AsyncInvoker>(); | ||
} | ||
|
||
void PeerConnection::DataChannelController::TeardownDataChannelTransport_n() { | ||
RTC_DCHECK_RUN_ON(network_thread()); | ||
data_channel_transport_invoker_ = nullptr; | ||
if (data_channel_transport()) { | ||
data_channel_transport()->SetDataSink(nullptr); | ||
} | ||
set_data_channel_transport(nullptr); | ||
} | ||
|
||
void PeerConnection::DataChannelController::OnTransportChanged( | ||
DataChannelTransportInterface* new_data_channel_transport) { | ||
RTC_DCHECK_RUN_ON(network_thread()); | ||
if (data_channel_transport() && | ||
data_channel_transport() != new_data_channel_transport) { | ||
// Changed which data channel transport is used for |sctp_mid_| (eg. now | ||
// it's bundled). | ||
data_channel_transport()->SetDataSink(nullptr); | ||
set_data_channel_transport(new_data_channel_transport); | ||
if (new_data_channel_transport) { | ||
new_data_channel_transport->SetDataSink(this); | ||
|
||
// There's a new data channel transport. This needs to be signaled to the | ||
// |sctp_data_channels_| so that they can reopen and reconnect. This is | ||
// necessary when bundling is applied. | ||
data_channel_transport_invoker_->AsyncInvoke<void>( | ||
RTC_FROM_HERE, signaling_thread(), [this] { | ||
RTC_DCHECK_RUN_ON(pc_->signaling_thread()); | ||
for (auto channel : pc_->sctp_data_channels_) { | ||
channel->OnTransportChannelCreated(); | ||
} | ||
}); | ||
} | ||
} | ||
} | ||
|
||
bool PeerConnection::DataChannelController::HandleOpenMessage_s( | ||
const cricket::ReceiveDataParams& params, | ||
const rtc::CopyOnWriteBuffer& buffer) { | ||
if (params.type == cricket::DMT_CONTROL && IsOpenMessage(buffer)) { | ||
// Received OPEN message; parse and signal that a new data channel should | ||
// be created. | ||
std::string label; | ||
InternalDataChannelInit config; | ||
config.id = params.ssrc; | ||
if (!ParseDataChannelOpenMessage(buffer, &label, &config)) { | ||
RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for ssrc " | ||
<< params.ssrc; | ||
return true; | ||
} | ||
config.open_handshake_role = InternalDataChannelInit::kAcker; | ||
OnDataChannelOpenMessage(label, config); | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
void PeerConnection::DataChannelController::OnDataChannelOpenMessage( | ||
const std::string& label, | ||
const InternalDataChannelInit& config) { | ||
rtc::scoped_refptr<DataChannel> channel( | ||
InternalCreateDataChannel(label, &config)); | ||
if (!channel.get()) { | ||
RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."; | ||
return; | ||
} | ||
|
||
rtc::scoped_refptr<DataChannelInterface> proxy_channel = | ||
DataChannelProxy::Create(signaling_thread(), channel); | ||
{ | ||
RTC_DCHECK_RUN_ON(pc_->signaling_thread()); | ||
pc_->Observer()->OnDataChannel(std::move(proxy_channel)); | ||
pc_->NoteUsageEvent(UsageEvent::DATA_ADDED); | ||
} | ||
} | ||
|
||
rtc::scoped_refptr<DataChannel> | ||
PeerConnection::DataChannelController::InternalCreateDataChannel( | ||
const std::string& label, | ||
const InternalDataChannelInit* config) { | ||
RTC_DCHECK_RUN_ON(pc_->signaling_thread()); | ||
if (pc_->IsClosed()) { | ||
return nullptr; | ||
} | ||
if (pc_->data_channel_type() == cricket::DCT_NONE) { | ||
RTC_LOG(LS_ERROR) | ||
<< "InternalCreateDataChannel: Data is not supported in this call."; | ||
return nullptr; | ||
} | ||
InternalDataChannelInit new_config = | ||
config ? (*config) : InternalDataChannelInit(); | ||
if (DataChannel::IsSctpLike(pc_->data_channel_type_)) { | ||
if (new_config.id < 0) { | ||
rtc::SSLRole role; | ||
if ((pc_->GetSctpSslRole(&role)) && | ||
!sid_allocator_.AllocateSid(role, &new_config.id)) { | ||
RTC_LOG(LS_ERROR) | ||
<< "No id can be allocated for the SCTP data channel."; | ||
return nullptr; | ||
} | ||
} else if (!sid_allocator_.ReserveSid(new_config.id)) { | ||
RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel " | ||
"because the id is already in use or out of range."; | ||
return nullptr; | ||
} | ||
} | ||
|
||
rtc::scoped_refptr<DataChannel> channel( | ||
DataChannel::Create(this, pc_->data_channel_type(), label, new_config)); | ||
if (!channel) { | ||
sid_allocator_.ReleaseSid(new_config.id); | ||
return nullptr; | ||
} | ||
|
||
if (channel->data_channel_type() == cricket::DCT_RTP) { | ||
if (pc_->rtp_data_channels_.find(channel->label()) != | ||
pc_->rtp_data_channels_.end()) { | ||
RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label() | ||
<< " already exists."; | ||
return nullptr; | ||
} | ||
pc_->rtp_data_channels_[channel->label()] = channel; | ||
} else { | ||
RTC_DCHECK(DataChannel::IsSctpLike(pc_->data_channel_type_)); | ||
pc_->sctp_data_channels_.push_back(channel); | ||
channel->SignalClosed.connect(pc_, | ||
&PeerConnection::OnSctpDataChannelClosed); | ||
} | ||
|
||
pc_->SignalDataChannelCreated_(channel.get()); | ||
return channel; | ||
} | ||
|
||
void PeerConnection::DataChannelController::AllocateSctpSids( | ||
rtc::SSLRole role) { | ||
RTC_DCHECK_RUN_ON(pc_->signaling_thread()); | ||
std::vector<rtc::scoped_refptr<DataChannel>> channels_to_close; | ||
for (const auto& channel : pc_->sctp_data_channels_) { | ||
if (channel->id() < 0) { | ||
int sid; | ||
if (!sid_allocator_.AllocateSid(role, &sid)) { | ||
RTC_LOG(LS_ERROR) << "Failed to allocate SCTP sid, closing channel."; | ||
channels_to_close.push_back(channel); | ||
continue; | ||
} | ||
channel->SetSctpSid(sid); | ||
} | ||
} | ||
// Since closing modifies the list of channels, we have to do the actual | ||
// closing outside the loop. | ||
for (const auto& channel : channels_to_close) { | ||
channel->CloseAbruptly(); | ||
} | ||
} | ||
|
||
void PeerConnection::DataChannelController::OnSctpDataChannelClosed( | ||
DataChannel* channel) { | ||
RTC_DCHECK_RUN_ON(pc_->signaling_thread()); | ||
for (auto it = pc_->sctp_data_channels_.begin(); | ||
it != pc_->sctp_data_channels_.end(); ++it) { | ||
if (it->get() == channel) { | ||
if (channel->id() >= 0) { | ||
// After the closing procedure is done, it's safe to use this ID for | ||
// another data channel. | ||
sid_allocator_.ReleaseSid(channel->id()); | ||
} | ||
// Since this method is triggered by a signal from the DataChannel, | ||
// we can't free it directly here; we need to free it asynchronously. | ||
pc_->sctp_data_channels_to_free_.push_back(*it); | ||
pc_->sctp_data_channels_.erase(it); | ||
pc_->SignalFreeDataChannels(); | ||
return; | ||
} | ||
} | ||
} | ||
|
||
} // namespace webrtc |
Oops, something went wrong.