Skip to content

Commit

Permalink
SmartPtr: Use shared ptr in RTC TCP connection.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jun 13, 2024
1 parent 7b9c52b commit f8ab409
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 61 deletions.
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,7 @@ std::string SrsRtcConnection::desc()

void SrsRtcConnection::expire()
{
// TODO: FIXME: Should set session to expired and remove it by heartbeat checking. Should not remove it directly.
_srs_rtc_manager->remove(this);
}

Expand Down
68 changes: 27 additions & 41 deletions trunk/src/app/srs_app_rtc_network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,15 +436,14 @@ srs_error_t SrsRtcUdpNetwork::write(void* buf, size_t size, ssize_t* nwrite)
return sendonly_skt_->sendto(buf, size, SRS_UTIME_NO_TIMEOUT);
}

SrsRtcTcpNetwork::SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta)
SrsRtcTcpNetwork::SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta) : owner_(NULL)
{
conn_ = conn;
delta_ = delta;
sendonly_skt_ = NULL;
transport_ = new SrsSecurityTransport(this);
peer_port_ = 0;
state_ = SrsRtcNetworkStateInit;
owner_ = NULL;
}

SrsRtcTcpNetwork::~SrsRtcTcpNetwork()
Expand Down Expand Up @@ -694,31 +693,36 @@ void SrsRtcTcpNetwork::dispose()

#define SRS_RTC_TCP_PACKET_MAX 1500

SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm)
SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port)
{
manager_ = cm;
wrapper_ = NULL;
owner_coroutine_ = NULL;
owner_cid_ = NULL;
cid_ = _srs_context->get_id();

ip_ = cip;
port_ = port;
skt_ = skt;
delta_ = new SrsNetworkDelta();
delta_->set_io(skt_, skt_);
trd_ = new SrsSTCoroutine("tcp", this, _srs_context->get_id());
session_ = NULL;
pkt_ = new char[SRS_RTC_TCP_PACKET_MAX];
_srs_rtc_manager->subscribe(this);
}

SrsRtcTcpConn::~SrsRtcTcpConn()
{
_srs_rtc_manager->unsubscribe(this);
trd_->interrupt();
srs_freep(trd_);

srs_freepa(pkt_);
srs_freep(delta_);
srs_freep(skt_);
}

void SrsRtcTcpConn::setup_owner(SrsSharedResource<SrsRtcTcpConn>* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid)
{
wrapper_ = wrapper;
owner_coroutine_ = owner_coroutine;
owner_cid_ = owner_cid;
}

ISrsKbpsDelta* SrsRtcTcpConn::delta()
{
return delta_;
Expand All @@ -731,17 +735,17 @@ std::string SrsRtcTcpConn::desc()

const SrsContextId& SrsRtcTcpConn::get_id()
{
return trd_->cid();
return cid_;
}

std::string SrsRtcTcpConn::remote_ip()
{
return ip_;
}

srs_error_t SrsRtcTcpConn::start()
void SrsRtcTcpConn::on_executor_done(ISrsInterruptable* executor)
{
return trd_->start();
owner_coroutine_ = NULL;
}

srs_error_t SrsRtcTcpConn::cycle()
Expand All @@ -752,15 +756,10 @@ srs_error_t SrsRtcTcpConn::cycle()
SrsStatistic::instance()->on_disconnect(get_id().c_str(), err);
SrsStatistic::instance()->kbps_add_delta(get_id().c_str(), delta_);

// TODO: FIXME: Should manage RTC TCP connection by _srs_rtc_manager.
// Because we use manager to manage this object, not the http connection object, so we must remove it here.
manager_->remove(this);

// TODO: FIXME: When TCP connection(transport) closed, should notify session to dispose, should not free them simultaneously.
// Only remove session when network is established, because client might use other UDP network.
if(session_ && session_->tcp()->is_establelished()) {
session_->tcp()->set_state(SrsRtcNetworkStateClosed);
_srs_rtc_manager->remove(session_);
session_->expire();
}

// For HTTP-API timeout, we think it's done successfully,
Expand Down Expand Up @@ -801,13 +800,18 @@ srs_error_t SrsRtcTcpConn::do_cycle()
{
srs_error_t err = srs_success;

// Update all context id to cid of session.
_srs_context->set_id(cid_);
owner_cid_->set_cid(cid_);

if((err = handshake()) != srs_success) {
return srs_error_wrap(err, "process rtc tcp pkt");
}

// TODO: FIXME: Handle all bytes of TCP Connection.
while(true) {
if((err = trd_->pull()) != srs_success) {
if (!owner_coroutine_) return err;
if ((err = owner_coroutine_->pull()) != srs_success) {
return srs_error_wrap(err, "rtc tcp conn");
}

Expand Down Expand Up @@ -859,11 +863,11 @@ srs_error_t SrsRtcTcpConn::handshake()

// Should support only one TCP candidate.
SrsRtcTcpNetwork* network = dynamic_cast<SrsRtcTcpNetwork*>(session->tcp());
if (!network->owner()) {
network->set_owner(this);
if (!network->owner().get()) {
network->set_owner(*wrapper_);
session_ = session;
}
if (network->owner() != this) {
if (network->owner().get() != this) {
return srs_error_new(ERROR_RTC_TCP_UNIQUE, "only support one network");
}

Expand Down Expand Up @@ -939,21 +943,3 @@ srs_error_t SrsRtcTcpConn::on_tcp_pkt(char* pkt, int nb_pkt)
return srs_error_new(ERROR_RTC_UDP, "unknown packet");
}

void SrsRtcTcpConn::on_before_dispose(ISrsResource* c)
{
if (!session_) return;

SrsRtcConnection* conn = dynamic_cast<SrsRtcConnection*>(c);
if(conn == session_) {
session_ = NULL;
// the related rtc connection will be disposed
srs_trace("RTC: tcp conn diposing, because of rtc connection");
trd_->interrupt();
}
}

void SrsRtcTcpConn::on_disposing(ISrsResource* c)
{
return;
}

34 changes: 18 additions & 16 deletions trunk/src/app/srs_app_rtc_network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class SrsRtcTcpNetwork: public ISrsRtcNetwork
private:
// The DTLS transport over this network.
ISrsRtcTransport* transport_;
SrsRtcTcpConn* owner_;
SrsSharedResource<SrsRtcTcpConn> owner_;
private:
std::string peer_ip_;
int peer_port_;
Expand All @@ -189,8 +189,8 @@ class SrsRtcTcpNetwork: public ISrsRtcNetwork
SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta);
virtual ~SrsRtcTcpNetwork();
public:
void set_owner(SrsRtcTcpConn* v) { owner_ = v; }
SrsRtcTcpConn* owner() { return owner_; }
void set_owner(SrsSharedResource<SrsRtcTcpConn> v) { owner_ = v; }
SrsSharedResource<SrsRtcTcpConn> owner() { return owner_; }
void update_sendonly_socket(ISrsProtocolReadWriter* skt);
//ISrsRtcNetwork
public:
Expand Down Expand Up @@ -232,13 +232,9 @@ class SrsRtcTcpNetwork: public ISrsRtcNetwork
};

// For WebRTC over TCP.
class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCoroutineHandler, public ISrsDisposingHandler
class SrsRtcTcpConn : public ISrsConnection, public ISrsCoroutineHandler, public ISrsExecutorHandler
{
private:
// The manager object to manage the connection.
ISrsResourceManager* manager_;
// Use a coroutine to serve the TCP connection.
SrsCoroutine* trd_;
// The ip and port of client.
std::string ip_;
int port_;
Expand All @@ -249,9 +245,19 @@ class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCo
ISrsProtocolReadWriter* skt_;
// Packet cache.
char* pkt_;
public:
SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm);
private:
// The shared resource which own this object, we should never free it because it's managed by shared ptr.
SrsSharedResource<SrsRtcTcpConn>* wrapper_;
// The owner coroutine, allow user to interrupt the loop.
ISrsInterruptable* owner_coroutine_;
ISrsContextIdSetter* owner_cid_;
SrsContextId cid_;
public:
SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port);
virtual ~SrsRtcTcpConn();
public:
// Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id.
void setup_owner(SrsSharedResource<SrsRtcTcpConn>* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid);
public:
ISrsKbpsDelta* delta();
// Interface ISrsResource.
Expand All @@ -261,9 +267,9 @@ class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCo
// Interface ISrsConnection.
public:
virtual std::string remote_ip();
// Interface ISrsStartable
// Interface ISrsExecutorHandler
public:
virtual srs_error_t start();
virtual void on_executor_done(ISrsInterruptable* executor);
// Interface ISrsCoroutineHandler
public:
virtual srs_error_t cycle();
Expand All @@ -273,10 +279,6 @@ class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCo
srs_error_t read_packet(char* pkt, int* nb_pkt);
srs_error_t on_stun(char* pkt, int nb_pkt);
srs_error_t on_tcp_pkt(char* pkt, int nb_pkt);
// Interface of ISrsDisposingHandler
public:
virtual void on_before_dispose(ISrsResource* c);
virtual void on_disposing(ISrsResource* c);
};

#endif
Expand Down
22 changes: 18 additions & 4 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ using namespace std;
#ifdef SRS_GB28181
#include <srs_app_gb28181.hpp>
#endif
#include <srs_app_rtc_server.hpp>

SrsSignalManager* SrsSignalManager::instance = NULL;

Expand Down Expand Up @@ -1193,8 +1194,7 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener* listener, srs_netfd_t& stf
if (nn == 10 && b[0] == 0 && b[2] == 0 && b[3] == 1 && b[1] - b[5] == 20
&& b[6] == 0x21 && b[7] == 0x12 && b[8] == 0xa4 && b[9] == 0x42
) {
// TODO: FIXME: Should manage this connection by _srs_rtc_manager
resource = new SrsRtcTcpConn(io, ip, port, this);
resource = new SrsRtcTcpConn(io, ip, port);
} else {
resource = new SrsHttpxConn(listener == http_listener_, this, io, http_server, ip, port);
}
Expand All @@ -1213,8 +1213,7 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener* listener, srs_netfd_t& stf
resource = new SrsHttpxConn(is_https, this, new SrsTcpConnection(stfd2), http_server, ip, port);
#ifdef SRS_RTC
} else if (listener == webrtc_listener_) {
// TODO: FIXME: Should manage this connection by _srs_rtc_manager
resource = new SrsRtcTcpConn(new SrsTcpConnection(stfd2), ip, port, this);
resource = new SrsRtcTcpConn(new SrsTcpConnection(stfd2), ip, port);
#endif
} else if (listener == exporter_listener_) {
// TODO: FIXME: Maybe should support https metrics.
Expand All @@ -1227,11 +1226,26 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener* listener, srs_netfd_t& stf
}
}

// For RTC TCP connection, use resource executor to manage the resource.
SrsRtcTcpConn* raw_conn = dynamic_cast<SrsRtcTcpConn*>(resource);
if (raw_conn) {
SrsSharedResource<SrsRtcTcpConn>* conn = new SrsSharedResource<SrsRtcTcpConn>(raw_conn);
SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_rtc_manager, conn, raw_conn, raw_conn);
raw_conn->setup_owner(conn, executor, executor);
if ((err = executor->start()) != srs_success) {
srs_freep(executor);
return srs_error_wrap(err, "start executor");
}
return err;
}

// Use connection manager to manage all the resources.
srs_assert(resource);
conn_manager->add(resource);

// If connection is a resource to start, start a coroutine to handle it.
ISrsStartable* conn = dynamic_cast<ISrsStartable*>(resource);
srs_assert(conn);
if ((err = conn->start()) != srs_success) {
return srs_error_wrap(err, "start conn coroutine");
}
Expand Down

0 comments on commit f8ab409

Please sign in to comment.