Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward: support config full rtmp url forward to other server #2799

Merged
merged 6 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions trunk/conf/forward.backend.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# the config for srs to forward to slave service
# @see https://github.com/ossrs/srs/wiki/v5_CN_SampleForward
# @see full.conf for detail config.

listen 1935;
max_connections 1000;
pid ./objs/srs.backend.pid;
daemon off;
srs_log_tank console;
vhost __defaultVhost__ {
forward {
enabled on;
backend http://127.0.0.1:8085/api/v1/forward;
}
}
30 changes: 30 additions & 0 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,36 @@ vhost same.vhost.forward.srs.com {
# active-active for cdn to build high available fault tolerance system.
# format: {ip}:{port} {ip_N}:{port_N}
destination 127.0.0.1:1936 127.0.0.1:1937;

# when client(encoder) publish to vhost/app/stream, call the hook in creating backend forwarder.
# the request in the POST data string is a object encode by json:
# {
# "action": "on_forward",
# "server_id": "vid-k21d7y2",
# "client_id": "9o7g1330",
# "ip": "127.0.0.1",
# "vhost": "__defaultVhost__",
# "app": "live",
# "tcUrl": "rtmp://127.0.0.1:1935/live",
# "stream": "livestream",
# "param": ""
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):
# {
# "code": 0,
# "data": {
# "urls":[
# "rtmp://127.0.0.1:19350/test/teststream"
# ]
# }
# }
# PS: you can transform params to backend service, such as:
# { "param": "?forward=rtmp://127.0.0.1:19351/test/livestream" }
# then backend return forward's url in response.
# only support one api hook, format:
# backend http://xxx/api0
winlinvip marked this conversation as resolved.
Show resolved Hide resolved
backend http://127.0.0.1:8085/api/v1/forward;
}
}

Expand Down
81 changes: 81 additions & 0 deletions trunk/research/api-server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,86 @@ def POST(self):
def OPTIONS(self, *args, **kwargs):
enable_crossdomain()

'''
handle the forward requests: dynamic forward url.
'''
class RESTForward(object):
exposed = True

def __init__(self):
self.__forwards = []

def GET(self):
enable_crossdomain()

forwards = {}
return json.dumps(forwards)

'''
for SRS hook: on_forward
on_forward:
when srs reap a dvr file, call the hook,
the request in the POST data string is a object encode by json:
{
"action": "on_forward",
"server_id": "server_test",
"client_id": 1985,
"ip": "192.168.1.10",
"vhost": "video.test.com",
"app": "live",
"tcUrl": "rtmp://video.test.com/live?key=d2fa801d08e3f90ed1e1670e6e52651a",
"stream": "livestream",
"param":"?token=xxx&salt=yyy"
}
if valid, the hook must return HTTP code 200(Stauts OK) and response
an int value specifies the error code(0 corresponding to success):
0
'''
def POST(self):
enable_crossdomain()

# return the error code in str
code = Error.success

req = cherrypy.request.body.read()
trace("post to forwards, req=%s"%(req))
try:
json_req = json.loads(req)
except Exception, ex:
code = Error.system_parse_json
trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
return json.dumps({"code": int(code), "data": None})

action = json_req["action"]
if action == "on_forward":
return self.__on_forward(json_req)
else:
trace("invalid request action: %s"%(json_req["action"]))
code = Error.request_invalid_action

return json.dumps({"code": int(code), "data": None})

def OPTIONS(self, *args, **kwargs):
enable_crossdomain()

def __on_forward(self, req):
code = Error.success

trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, tcUrl=%s, stream=%s, param=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["tcUrl"], req["stream"], req["param"]
))

'''
backend service config description:
support multiple rtmp urls(custom addresses or third-party cdn service),
url's host is slave service.
For example:
["rtmp://127.0.0.1:19350/test/teststream", "rtmp://127.0.0.1:19350/test/teststream?token=xxxx"]
'''
forwards = ["rtmp://127.0.0.1:19350/test/teststream"]

return json.dumps({"code": int(code), "data": {"urls": forwards}})
winlinvip marked this conversation as resolved.
Show resolved Hide resolved

# HTTP RESTful path.
class Root(object):
exposed = True
Expand Down Expand Up @@ -846,6 +926,7 @@ def __init__(self):
self.chats = RESTChats()
self.servers = RESTServers()
self.snapshots = RESTSnapshots()
self.forward = RESTForward()
def GET(self):
enable_crossdomain();
return json.dumps({"code":Error.success, "urls":{
Expand Down
17 changes: 16 additions & 1 deletion trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2795,7 +2795,7 @@ srs_error_t SrsConfig::check_normal_config()
} else if (n == "forward") {
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name;
if (m != "enabled" && m != "destination") {
if (m != "enabled" && m != "destination" && m != "backend") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.forward.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
Expand Down Expand Up @@ -4605,6 +4605,21 @@ SrsConfDirective* SrsConfig::get_forwards(string vhost)
return conf->get("destination");
}

SrsConfDirective* SrsConfig::get_forward_backend(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}

conf = conf->get("forward");
if (!conf) {
return NULL;
}

return conf->get("backend");
}

SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,8 @@ class SrsConfig
virtual bool get_forward_enabled(SrsConfDirective* vhost);
// Get the forward directive of vhost.
virtual SrsConfDirective* get_forwards(std::string vhost);
// Get the forward directive of backend.
virtual SrsConfDirective* get_forward_backend(std::string vhost);

public:
// Whether the srt sevice enabled
Expand Down
4 changes: 3 additions & 1 deletion trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ SrsForwarder::~SrsForwarder()

srs_freep(sh_video);
srs_freep(sh_audio);

srs_freep(req);
}

srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)
Expand All @@ -60,7 +62,7 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)

// it's ok to use the request object,
// SrsLiveSource already copy it and never delete it.
req = r;
req = r->copy();

// the ep(endpoint) to forward to
ep_forward = ep;
Expand Down
70 changes: 70 additions & 0 deletions trunk/src/app/srs_app_http_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,76 @@ srs_error_t SrsHttpHooks::discover_co_workers(string url, string& host, int& por
return err;
}

srs_error_t SrsHttpHooks::on_forward_backend(string url, SrsRequest* req, std::vector<std::string>& rtmp_urls)
{
srs_error_t err = srs_success;

SrsContextId cid = _srs_context->get_id();

SrsStatistic* stat = SrsStatistic::instance();

SrsJsonObject* obj = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, obj);

obj->set("action", SrsJsonAny::str("on_forward"));
obj->set("server_id", SrsJsonAny::str(stat->server_id().c_str()));
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));

std::string data = obj->dumps();
std::string res;
int status_code;

SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: on_forward_backend failed, client_id=%s, url=%s, request=%s, response=%s, code=%d",
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
}

// parse string res to json.
SrsJsonAny* info = SrsJsonAny::loads(res);
if (!info) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "load json from %s", res.c_str());
}
SrsAutoFree(SrsJsonAny, info);

// response error code in string.
if (!info->is_object()) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "response %s", res.c_str());
}

SrsJsonAny* prop = NULL;
// response standard object, format in json: {}
SrsJsonObject* res_info = info->to_object();
if ((prop = res_info->ensure_property_object("data")) == NULL) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse data %s", res.c_str());
}

SrsJsonObject* p = prop->to_object();
if ((prop = p->ensure_property_array("urls")) == NULL) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse urls %s", res.c_str());
}

SrsJsonArray* urls = prop->to_array();
for (int i = 0; i < urls->count(); i++) {
prop = urls->at(i);
string rtmp_url = prop->to_str();
if (!rtmp_url.empty()) {
rtmp_urls.push_back(rtmp_url);
}
}

srs_trace("http: on_forward_backend ok, client_id=%s, url=%s, request=%s, response=%s",
cid.c_str(), url.c_str(), data.c_str(), res.c_str());

return err;
}

srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, string& res)
{
srs_error_t err = srs_success;
Expand Down
5 changes: 5 additions & 0 deletions trunk/src/app/srs_app_http_hooks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <srs_core.hpp>

#include <string>
#include <vector>

class SrsHttpUri;
class SrsStSocket;
Expand Down Expand Up @@ -79,6 +80,10 @@ class SrsHttpHooks
static srs_error_t on_hls_notify(SrsContextId cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify);
// Discover co-workers for origin cluster.
static srs_error_t discover_co_workers(std::string url, std::string& host, int& port);
// The on_forward_backend hook, when publish stream start to forward
// @param url the api server url, to valid the client.
// ignore if empty.
static srs_error_t on_forward_backend(std::string url, SrsRequest* req, std::vector<std::string>& rtmp_urls);
private:
static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res);
};
Expand Down
Loading