Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RTMP stream forwarding (Forward) is specified through a URL. #231

Closed
wants to merge 12 commits into from
12 changes: 11 additions & 1 deletion trunk/research/librtmp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ else
objs/srs_ingest_flv objs/srs_ingest_rtmp objs/srs_detect_rtmp \
objs/srs_bandwidth_check objs/srs_h264_raw_publish \
objs/srs_audio_raw_publish objs/srs_aac_raw_publish \
objs/srs_rtmp_dump
objs/srs_rtmp_dump \
objs/srs_rtmp_uri_encode \
objs/srs_rtmp_uri_decode
endif

.PHONY: default clean help ssl nossl
Expand Down Expand Up @@ -46,9 +48,11 @@ clean:

# srs library root
SRS_OBJS = ../../objs
SRS_SRC = ../../src
# srs-librtmp for publish/play, built by srs.
SRS_LIBRTMP_I = $(SRS_OBJS)/include/srs_librtmp.h
SRS_LIBRTMP_L = $(SRS_OBJS)/lib/srs_librtmp.a
SRS_LIB_INCLUDE = -I$(SRS_SRC)/app -I$(SRS_SRC)/kernel -I$(SRS_SRC)/core -I$(SRS_SRC)/libs -I$(SRS_SRC)/protocol -I$(SRS_OBJS)
# openssl for complex handshake, built by srs.
SRS_LIBSSL_L =
# public depends, the Makefile or public headers.
Expand Down Expand Up @@ -113,3 +117,9 @@ objs/srs_bandwidth_check: srs_bandwidth_check.c $(SRS_RESEARCH_DEPS) $(SRS_LIBRT

objs/srs_rtmp_dump: srs_rtmp_dump.c $(SRS_RESEARCH_DEPS) $(SRS_LIBRTMP_I) $(SRS_LIBRTMP_L) $(SRS_LIBSSL_L)
$(GCC) srs_rtmp_dump.c $(SRS_LIBRTMP_L) $(SRS_LIBSSL_L) $(EXTRA_CXX_FLAG) -o objs/srs_rtmp_dump

objs/srs_rtmp_uri_encode: srs_rtmp_uri_encode.c $(SRS_RESEARCH_DEPS) $(SRS_LIBRTMP_I) $(SRS_LIBRTMP_L) $(SRS_LIBSSL_L)
g++ srs_rtmp_uri_encode.c $(SRS_LIBRTMP_L) $(SRS_LIBSSL_L) $(EXTRA_CXX_FLAG) -o objs/srs_rtmp_uri_encode $(SRS_LIB_INCLUDE)

objs/srs_rtmp_uri_decode: srs_rtmp_uri_decode.c $(SRS_RESEARCH_DEPS) $(SRS_LIBRTMP_I) $(SRS_LIBRTMP_L) $(SRS_LIBSSL_L)
g++ srs_rtmp_uri_decode.c $(SRS_LIBRTMP_L) $(SRS_LIBSSL_L) $(EXTRA_CXX_FLAG) -o objs/srs_rtmp_uri_decode $(SRS_LIB_INCLUDE)
52 changes: 52 additions & 0 deletions trunk/research/librtmp/srs_rtmp_uri_decode.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
The MIT License (MIT)

Copyright (c) 2013-2015 SRS(simple-rtmp-server)

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
gcc srs_rtmp_dump.c ../../objs/lib/srs_librtmp.a -g -O0 -lstdc++ -o srs_rtmp_dump
*/

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <getopt.h>
#include <assert.h>

#include "../../src/protocol/srs_rtmp_utility.hpp"


int main(int argc, char** argv)
{

if ( argc > 1) {
printf("%s\n",srs_UriDecode(argv[1]).c_str());
} else {
printf("Usage: %s after_encode_uri \n"
"For example:\n"
" %s %s \n"
,argv[0],argv[0]
,"rtmp%3A%2F%2F127%2E0%2E0%2E1%3A1935%2Flive%2Flivestream%3Ftime%3DXXXXXX%26token%3DXXXXX"
);
exit(-1);
}

return 0;
}
51 changes: 51 additions & 0 deletions trunk/research/librtmp/srs_rtmp_uri_encode.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
The MIT License (MIT)

Copyright (c) 2013-2015 SRS(simple-rtmp-server)

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
gcc srs_rtmp_dump.c ../../objs/lib/srs_librtmp.a -g -O0 -lstdc++ -o srs_rtmp_dump
*/

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <getopt.h>
#include <assert.h>

#include "../../src/protocol/srs_rtmp_utility.hpp"


int main(int argc, char** argv)
{

if ( argc > 1) {
printf("%s\n",srs_UriEncode(argv[1]).c_str());
} else {
printf("Usage: %s url \n"
"For example:\n"
" %s rtmp://127.0.0.1:1935/live/livestream?time=XXXXXX&token=XXXXX\n"
,argv[0],argv[0]
);
exit(-1);
}

return 0;
}
51 changes: 33 additions & 18 deletions trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ int SrsForwarder::on_publish()
SrsRequest* req = _req;

// discovery the server port and tcUrl from req and ep_forward.
std::string server, port, tc_url;
discovery_ep(server, port, tc_url);
std::string server, port, tc_url,app,stream ;
discovery_ep(server, port, tc_url,app,stream );

// dead loop check
std::string source_ep = "rtmp://";
Expand Down Expand Up @@ -225,8 +225,8 @@ int SrsForwarder::cycle()
{
int ret = ERROR_SUCCESS;

std::string ep_server, ep_port;
if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {
std::string ep_server, ep_port, ep_url,ep_app,ep_stream;
if ((ret = connect_server(ep_server, ep_port, ep_url,ep_app,ep_stream)) != ERROR_SUCCESS) {
return ret;
}
srs_assert(client);
Expand All @@ -238,7 +238,7 @@ int SrsForwarder::cycle()
srs_error("handshake with server failed. ret=%d", ret);
return ret;
}
if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) {
if ((ret = connect_app(ep_server, ep_port, ep_url,ep_app)) != ERROR_SUCCESS) {
srs_error("connect with server failed. ret=%d", ret);
return ret;
}
Expand All @@ -247,7 +247,7 @@ int SrsForwarder::cycle()
return ret;
}

if ((ret = client->publish(_req->stream, stream_id)) != ERROR_SUCCESS) {
if ((ret = client->publish(ep_stream, stream_id)) != ERROR_SUCCESS) {
srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",
_req->stream.c_str(), stream_id, ret);
return ret;
Expand All @@ -270,10 +270,18 @@ void SrsForwarder::close_underlayer_socket()
srs_close_stfd(stfd);
}

void SrsForwarder::discovery_ep(string& server, string& port, string& tc_url)
void SrsForwarder::discovery_ep(string& server, string& port, string& tc_url, std::string& ep_app, std::string& ep_stream)
{
SrsRequest* req = _req;

std::string schema;

if ( srs_discovery_rtmp_url(_ep_forward
,schema,server, port
,ep_app,ep_stream) ) {
tc_url = schema + "://" + server + ":" + port + "/" + ep_app ;
return ;
}

server = _ep_forward;
port = SRS_CONSTS_RTMP_DEFAULT_PORT;

Expand All @@ -285,10 +293,13 @@ void SrsForwarder::discovery_ep(string& server, string& port, string& tc_url)
}

// generate tcUrl
tc_url = srs_generate_tc_url(server, req->vhost, req->app, port, req->param);
tc_url = srs_generate_tc_url(server, req->vhost, req->app, port
, (req->forward.empty() ? req->param : ""));
ep_app = req->app;
ep_stream = req->stream;
}

int SrsForwarder::connect_server(string& ep_server, string& ep_port)
int SrsForwarder::connect_server(string& ep_server, string& ep_port, string& ep_url, string& ep_app, string& ep_stream)
{
int ret = ERROR_SUCCESS;

Expand All @@ -297,12 +308,13 @@ int SrsForwarder::connect_server(string& ep_server, string& ep_port)

// discovery the server port and tcUrl from req and ep_forward.
std::string server, s_port, tc_url;
discovery_ep(server, s_port, tc_url);
discovery_ep(server, s_port, tc_url,ep_app,ep_stream);
int port = ::atoi(s_port.c_str());

// output the connected server and port.
ep_server = server;
ep_port = s_port;
ep_url = tc_url;

// open socket.
int64_t timeout = SRS_FORWARDER_SLEEP_US;
Expand All @@ -321,14 +333,14 @@ int SrsForwarder::connect_server(string& ep_server, string& ep_port)

kbps->set_io(io, io);

srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d",
_req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port);
srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d forward_url=%s",
_req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port,ep_url.c_str());

return ret;
}

// TODO: FIXME: refine the connect_app.
int SrsForwarder::connect_app(string ep_server, string ep_port)
int SrsForwarder::connect_app(string ep_server, string ep_port, string ep_url, string ep_app)
{
int ret = ERROR_SUCCESS;

Expand Down Expand Up @@ -365,15 +377,18 @@ int SrsForwarder::connect_app(string ep_server, string ep_port)

// generate the tcUrl
std::string param = "";
std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);
//std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);
//std::string tc_url = ep_url;

// upnode server identity will show in the connect_app of client.
// @see https://github.com/simple-rtmp-server/srs/issues/160
// the debug_srs_upnode is config in vhost and default to true.
bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d",
tc_url.c_str(), debug_srs_upnode, ret);
srs_error("forward connect with server,app=%s, tcUrl=%s, dsu=%d.",
ep_app.c_str(),ep_url.c_str(),debug_srs_upnode);
if ((ret = client->connect_app(ep_app, ep_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d",
ep_url.c_str(), debug_srs_upnode, ret);
return ret;
}

Expand Down
7 changes: 4 additions & 3 deletions trunk/src/app/srs_app_forward.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ class SrsForwarder : public ISrsThreadHandler
virtual int cycle();
private:
virtual void close_underlayer_socket();
virtual void discovery_ep(std::string& server, std::string& port, std::string& tc_url);
virtual int connect_server(std::string& ep_server, std::string& ep_port);
virtual int connect_app(std::string ep_server, std::string ep_port);
virtual void discovery_ep(std::string& server, std::string& port, std::string& tc_url, std::string& ep_app, std::string& ep_stream);
;
virtual int connect_server(std::string& ep_server, std::string& ep_port, std::string& ep_url, std::string& ep_app, std::string& ep_stream);
virtual int connect_app(std::string ep_server, std::string ep_port, std::string ep_url, std::string ep_app);
virtual int forward();
};

Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ int SrsHttpClient::connect()
host.c_str(), port, timeout_us, ret);
return ret;
}
srs_info("connect to server success. server=%s, port=%d", host, port);
srs_info("connect to server success. server=%s, port=%d", host.c_str(), port);

srs_assert(!skt);
skt = new SrsStSocket(stfd);
Expand Down
33 changes: 33 additions & 0 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ using namespace std;
#include <srs_app_hds.hpp>
#include <srs_app_statistic.hpp>
#include <srs_core_autofree.hpp>
#include <srs_rtmp_utility.hpp>

#define CONST_MAX_JITTER_MS 500
#define DEFAULT_FRAME_TIME_MS 40
Expand Down Expand Up @@ -2067,10 +2068,42 @@ void SrsSource::on_edge_proxy_unpublish()
publish_edge->on_proxy_unpublish();
}

int SrsSource::create_forwarder_by_url(std::string forward_server)
{
int ret = ERROR_SUCCESS;

if ( forward_server.empty() ) {
srs_info("forwarders forwards is emptey");
return ret;
}

srs_trace("create forwarders by url. forwardUrl:%s",_req->forward.c_str());
SrsForwarder* forwarder = new SrsForwarder(this);
forwarders.push_back(forwarder);

// initialize the forwarder with request.
if ((ret = forwarder->initialize(_req, forward_server)) != ERROR_SUCCESS) {
return ret;
}

double queue_size = _srs_config->get_queue_length(_req->vhost);
forwarder->set_queue_size(queue_size);

if ((ret = forwarder->on_publish()) != ERROR_SUCCESS) {
srs_error("start forwarder failed. "
"vhost=%s, app=%s, stream=%s, forward-to=%s",
_req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(),
forward_server.c_str());
return ret;
}
}

int SrsSource::create_forwarders()
{
int ret = ERROR_SUCCESS;

create_forwarder_by_url(_req->forward);

SrsConfDirective* conf = _srs_config->get_forward(_req->vhost);
for (int i = 0; conf && i < (int)conf->args.size(); i++) {
std::string forward_server = conf->args.at(i);
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ class SrsSource : public ISrsReloadHandler
virtual void on_edge_proxy_unpublish();
private:
virtual int create_forwarders();
virtual int create_forwarder_by_url(std::string forward_server);
virtual void destroy_forwarders();
};

Expand Down
Loading