Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Sep 19, 2020
1 parent b1b6f66 commit 33f703a
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 130 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ test:
test-nodocker:
$(foreach IMG,$(shell echo test-images/*/ | xargs -n1 basename), \
docker build -q test-images/$(IMG) -t rtsp-simple-server-test-$(IMG)$(NL))
go test -race -v .
go test -race -v -run OnDemand .

stress:
docker build -q . -f stress/$(NAME)/Dockerfile -t temp
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ There are multiple ways to monitor the server usage over time:
```
2020/01/01 00:00:00 [2/1/1] [client 127.0.0.1:44428] OPTION
```
means that there are 2 clients, 1 publisher and 1 receiver.
means that there are 2 clients, 1 publisher and 1 reader.

* A metrics exporter, compatible with Prometheus, can be enabled with the option `metrics: yes`; then the server can be queried for metrics with Prometheus or with a simple HTTP request:
```
Expand Down
50 changes: 19 additions & 31 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ const (
type clientDescribeReq struct {
client *client
pathName string
pathConf *pathConf
}

type clientAnnounceReq struct {
res chan error
client *client
pathName string
pathConf *pathConf
trackCount int
sdp []byte
}
Expand Down Expand Up @@ -369,22 +371,21 @@ func (c *client) handleRequest(req *gortsplib.Request) error {

pathName = removeQueryFromPath(pathName)

confp := c.p.findConfForPathName(pathName)
if confp == nil {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", pathName))
pathConf, err := c.p.conf.checkPathNameAndFindConf(pathName)
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
return errRunTerminate
}

err := c.authenticate(confp.readIpsParsed, confp.ReadUser, confp.ReadPass, req)
err = c.authenticate(pathConf.readIpsParsed, pathConf.ReadUser, pathConf.ReadPass, req)
if err != nil {
if err == errAuthCritical {
return errRunTerminate
}
return nil
}

c.p.clientDescribe <- clientDescribeReq{c, pathName}
c.p.clientDescribe <- clientDescribeReq{c, pathName, pathConf}

c.describeCSeq = cseq
c.describeUrl = req.Url.String()
Expand All @@ -400,25 +401,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error {

pathName = removeQueryFromPath(pathName)

if len(pathName) == 0 {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("empty base path"))
return errRunTerminate
}

err := checkPathName(pathName)
pathConf, err := c.p.conf.checkPathNameAndFindConf(pathName)
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid path name: %s (%s)", err, pathName))
return errRunTerminate
}

confp := c.p.findConfForPathName(pathName)
if confp == nil {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", pathName))
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
return errRunTerminate
}

err = c.authenticate(confp.publishIpsParsed, confp.PublishUser, confp.PublishPass, req)
err = c.authenticate(pathConf.publishIpsParsed, pathConf.PublishUser, pathConf.PublishPass, req)
if err != nil {
if err == errAuthCritical {
return errRunTerminate
Expand Down Expand Up @@ -451,7 +440,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
sdp := tracks.Write()

res := make(chan error)
c.p.clientAnnounce <- clientAnnounceReq{res, c, pathName, len(tracks), sdp}
c.p.clientAnnounce <- clientAnnounceReq{res, c, pathName, pathConf, len(tracks), sdp}
err = <-res
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
Expand Down Expand Up @@ -489,14 +478,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
switch c.state {
// play
case clientStateInitial, clientStatePrePlay:
confp := c.p.findConfForPathName(basePath)
if confp == nil {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", basePath))
pathConf, err := c.p.conf.checkPathNameAndFindConf(basePath)
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
return errRunTerminate
}

err := c.authenticate(confp.readIpsParsed, confp.ReadUser, confp.ReadPass, req)
err = c.authenticate(pathConf.readIpsParsed, pathConf.ReadUser, pathConf.ReadPass, req)
if err != nil {
if err == errAuthCritical {
return errRunTerminate
Expand Down Expand Up @@ -899,9 +887,9 @@ func (c *client) runPlay() bool {
}(), c.streamProtocol)

var onReadCmd *exec.Cmd
if c.path.confp.RunOnRead != "" {
if c.path.conf.RunOnRead != "" {
var err error
onReadCmd, err = startExternalCommand(c.path.confp.RunOnRead, c.path.name)
onReadCmd, err = startExternalCommand(c.path.conf.RunOnRead, c.path.name)
if err != nil {
c.log("ERR: %s", err)
}
Expand Down Expand Up @@ -1050,9 +1038,9 @@ func (c *client) runRecord() bool {
}(), c.streamProtocol)

var onPublishCmd *exec.Cmd
if c.path.confp.RunOnPublish != "" {
if c.path.conf.RunOnPublish != "" {
var err error
onPublishCmd, err = startExternalCommand(c.path.confp.RunOnPublish, c.path.name)
onPublishCmd, err = startExternalCommand(c.path.conf.RunOnPublish, c.path.name)
if err != nil {
c.log("ERR: %s", err)
}
Expand Down
131 changes: 87 additions & 44 deletions conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"gopkg.in/yaml.v2"
)

type confPath struct {
type pathConf struct {
regexp *regexp.Regexp
Source string `yaml:"source"`
sourceUrl *url.URL ``
SourceProtocol string `yaml:"sourceProtocol"`
Expand Down Expand Up @@ -48,7 +49,7 @@ type conf struct {
LogDestinations []string `yaml:"logDestinations"`
logDestinationsParsed map[logDestination]struct{} ``
LogFile string `yaml:"logFile"`
Paths map[string]*confPath `yaml:"paths"`
Paths map[string]*pathConf `yaml:"paths"`
}

func loadConf(fpath string, stdin io.Reader) (*conf, error) {
Expand Down Expand Up @@ -172,105 +173,147 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) {
}

if len(conf.Paths) == 0 {
conf.Paths = map[string]*confPath{
conf.Paths = map[string]*pathConf{
"all": {},
}
}

for name, confp := range conf.Paths {
if confp == nil {
conf.Paths[name] = &confPath{}
confp = conf.Paths[name]
// "all" is an alias for "~^.*$"
if _, ok := conf.Paths["all"]; ok {
conf.Paths["~^.*$"] = conf.Paths["all"]
delete(conf.Paths, "all")
}

for name, pconf := range conf.Paths {
if pconf == nil {
conf.Paths[name] = &pathConf{}
pconf = conf.Paths[name]
}

err := checkPathName(name)
if err != nil {
return nil, fmt.Errorf("invalid path name: %s (%s)", err, name)
if name == "" {
return nil, fmt.Errorf("path name can not be empty")
}

// normal path
if name[0] != '~' {
err := checkPathName(name)
if err != nil {
return nil, fmt.Errorf("invalid path name: %s (%s)", err, name)
}

// regular expression path
} else {
pathRegexp, err := regexp.Compile(name[1:])
if err != nil {
return nil, fmt.Errorf("invalid regular expression: %s", name[1:])
}
pconf.regexp = pathRegexp
}

if confp.Source == "" {
confp.Source = "record"
if pconf.Source == "" {
pconf.Source = "record"
}

if confp.Source != "record" {
if name == "all" {
return nil, fmt.Errorf("path 'all' cannot have a RTSP source; use another path")
if pconf.Source != "record" {
if pconf.regexp != nil {
return nil, fmt.Errorf("a path with a regular expression cannot have a RTSP source; use another path")
}

confp.sourceUrl, err = url.Parse(confp.Source)
pconf.sourceUrl, err = url.Parse(pconf.Source)
if err != nil {
return nil, fmt.Errorf("'%s' is not a valid RTSP url", confp.Source)
return nil, fmt.Errorf("'%s' is not a valid RTSP url", pconf.Source)
}
if confp.sourceUrl.Scheme != "rtsp" {
return nil, fmt.Errorf("'%s' is not a valid RTSP url", confp.Source)
if pconf.sourceUrl.Scheme != "rtsp" {
return nil, fmt.Errorf("'%s' is not a valid RTSP url", pconf.Source)
}
if confp.sourceUrl.Port() == "" {
confp.sourceUrl.Host += ":554"
if pconf.sourceUrl.Port() == "" {
pconf.sourceUrl.Host += ":554"
}
if confp.sourceUrl.User != nil {
pass, _ := confp.sourceUrl.User.Password()
user := confp.sourceUrl.User.Username()
if pconf.sourceUrl.User != nil {
pass, _ := pconf.sourceUrl.User.Password()
user := pconf.sourceUrl.User.Username()
if user != "" && pass == "" ||
user == "" && pass != "" {
fmt.Errorf("username and password must be both provided")
}
}

if confp.SourceProtocol == "" {
confp.SourceProtocol = "udp"
if pconf.SourceProtocol == "" {
pconf.SourceProtocol = "udp"
}
switch confp.SourceProtocol {
switch pconf.SourceProtocol {
case "udp":
confp.sourceProtocolParsed = gortsplib.StreamProtocolUDP
pconf.sourceProtocolParsed = gortsplib.StreamProtocolUDP

case "tcp":
confp.sourceProtocolParsed = gortsplib.StreamProtocolTCP
pconf.sourceProtocolParsed = gortsplib.StreamProtocolTCP

default:
return nil, fmt.Errorf("unsupported protocol '%s'", confp.SourceProtocol)
return nil, fmt.Errorf("unsupported protocol '%s'", pconf.SourceProtocol)
}
}

if confp.PublishUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.PublishUser) {
if pconf.PublishUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.PublishUser) {
return nil, fmt.Errorf("publish username must be alphanumeric")
}
}
if confp.PublishPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.PublishPass) {
if pconf.PublishPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.PublishPass) {
return nil, fmt.Errorf("publish password must be alphanumeric")
}
}
confp.publishIpsParsed, err = parseIpCidrList(confp.PublishIps)
pconf.publishIpsParsed, err = parseIpCidrList(pconf.PublishIps)
if err != nil {
return nil, err
}

if confp.ReadUser != "" && confp.ReadPass == "" || confp.ReadUser == "" && confp.ReadPass != "" {
if pconf.ReadUser != "" && pconf.ReadPass == "" || pconf.ReadUser == "" && pconf.ReadPass != "" {
return nil, fmt.Errorf("read username and password must be both filled")
}
if confp.ReadUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.ReadUser) {
if pconf.ReadUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.ReadUser) {
return nil, fmt.Errorf("read username must be alphanumeric")
}
}
if confp.ReadPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.ReadPass) {
if pconf.ReadPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.ReadPass) {
return nil, fmt.Errorf("read password must be alphanumeric")
}
}
if confp.ReadUser != "" && confp.ReadPass == "" || confp.ReadUser == "" && confp.ReadPass != "" {
if pconf.ReadUser != "" && pconf.ReadPass == "" || pconf.ReadUser == "" && pconf.ReadPass != "" {
return nil, fmt.Errorf("read username and password must be both filled")
}
confp.readIpsParsed, err = parseIpCidrList(confp.ReadIps)
pconf.readIpsParsed, err = parseIpCidrList(pconf.ReadIps)
if err != nil {
return nil, err
}

if name == "all" && confp.RunOnInit != "" {
return nil, fmt.Errorf("path 'all' does not support option 'runOnInit'; use another path")
if pconf.regexp != nil && pconf.RunOnInit != "" {
return nil, fmt.Errorf("a path with a regular expression does not support option 'runOnInit'; use another path")
}
}

return conf, nil
}

func (conf *conf) checkPathNameAndFindConf(name string) (*pathConf, error) {
err := checkPathName(name)
if err != nil {
return nil, fmt.Errorf("invalid path name: %s (%s)", err, name)
}

// normal path
if pconf, ok := conf.Paths[name]; ok {
return pconf, nil
}

// regular expression path
for _, pconf := range conf.Paths {
if pconf.regexp != nil && pconf.regexp.MatchString(name) {
return pconf, nil
}
}

return nil, fmt.Errorf("unable to find a valid configuration for path '%s'", name)
}
Loading

0 comments on commit 33f703a

Please sign in to comment.