Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

srt: support standard streamID syntax (#2469) #2919

Merged
merged 1 commit into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 22 additions & 43 deletions internal/servers/srt/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -147,50 +146,30 @@
}

func (c *conn) runInner2(req srtNewConnReq) (bool, error) {
parts := strings.Split(req.connReq.StreamId(), ":")
if (len(parts) < 2 || len(parts) > 5) || (parts[0] != "read" && parts[0] != "publish") {
return false, fmt.Errorf("invalid streamid '%s':"+
" it must be 'action:pathname[:query]' or 'action:pathname:user:pass[:query]', "+
"where action is either read or publish, pathname is the path name, user and pass are the credentials, "+
"query is an optional token containing additional information",
req.connReq.StreamId())
}

pathName := parts[1]
user := ""
pass := ""
query := ""

if len(parts) == 4 || len(parts) == 5 {
user, pass = parts[2], parts[3]
}

if len(parts) == 3 {
query = parts[2]
}

if len(parts) == 5 {
query = parts[4]
var streamID streamID
err := streamID.unmarshal(req.connReq.StreamId())
if err != nil {
return false, fmt.Errorf("invalid stream ID '%s': %w", req.connReq.StreamId(), err)

Check warning on line 152 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L149-L152

Added lines #L149 - L152 were not covered by tests
}

if parts[0] == "publish" {
return c.runPublish(req, pathName, user, pass, query)
if streamID.mode == streamIDModePublish {
return c.runPublish(req, &streamID)

Check warning on line 156 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L155-L156

Added lines #L155 - L156 were not covered by tests
}
return c.runRead(req, pathName, user, pass, query)
return c.runRead(req, &streamID)

Check warning on line 158 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L158

Added line #L158 was not covered by tests
}

func (c *conn) runPublish(req srtNewConnReq, pathName string, user string, pass string, query string) (bool, error) {
func (c *conn) runPublish(req srtNewConnReq, streamID *streamID) (bool, error) {

Check warning on line 161 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L161

Added line #L161 was not covered by tests
res := c.pathManager.AddPublisher(defs.PathAddPublisherReq{
Author: c,
AccessRequest: defs.PathAccessRequest{
Name: pathName,
Name: streamID.path,

Check warning on line 165 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L165

Added line #L165 was not covered by tests
IP: c.ip(),
Publish: true,
User: user,
Pass: pass,
User: streamID.user,
Pass: streamID.pass,

Check warning on line 169 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L168-L169

Added lines #L168 - L169 were not covered by tests
Proto: defs.AuthProtocolSRT,
ID: &c.uuid,
Query: query,
Query: streamID.query,

Check warning on line 172 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L172

Added line #L172 was not covered by tests
},
})

Expand Down Expand Up @@ -219,8 +198,8 @@

c.mutex.Lock()
c.state = connStatePublish
c.pathName = pathName
c.query = query
c.pathName = streamID.path
c.query = streamID.query

Check warning on line 202 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L201-L202

Added lines #L201 - L202 were not covered by tests
c.sconn = sconn
c.mutex.Unlock()

Expand Down Expand Up @@ -280,17 +259,17 @@
}
}

func (c *conn) runRead(req srtNewConnReq, pathName string, user string, pass string, query string) (bool, error) {
func (c *conn) runRead(req srtNewConnReq, streamID *streamID) (bool, error) {

Check warning on line 262 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L262

Added line #L262 was not covered by tests
res := c.pathManager.AddReader(defs.PathAddReaderReq{
Author: c,
AccessRequest: defs.PathAccessRequest{
Name: pathName,
Name: streamID.path,

Check warning on line 266 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L266

Added line #L266 was not covered by tests
IP: c.ip(),
User: user,
Pass: pass,
User: streamID.user,
Pass: streamID.pass,

Check warning on line 269 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L268-L269

Added lines #L268 - L269 were not covered by tests
Proto: defs.AuthProtocolSRT,
ID: &c.uuid,
Query: query,
Query: streamID.query,

Check warning on line 272 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L272

Added line #L272 was not covered by tests
},
})

Expand Down Expand Up @@ -320,8 +299,8 @@

c.mutex.Lock()
c.state = connStateRead
c.pathName = pathName
c.query = query
c.pathName = streamID.path
c.query = streamID.query

Check warning on line 303 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L302-L303

Added lines #L302 - L303 were not covered by tests
c.sconn = sconn
c.mutex.Unlock()

Expand All @@ -345,7 +324,7 @@
Conf: res.Path.SafeConf(),
ExternalCmdEnv: res.Path.ExternalCmdEnv(),
Reader: c.APIReaderDescribe(),
Query: query,
Query: streamID.query,

Check warning on line 327 in internal/servers/srt/conn.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/conn.go#L327

Added line #L327 was not covered by tests
})
defer onUnreadHook()

Expand Down
100 changes: 100 additions & 0 deletions internal/servers/srt/streamid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package srt

import (
"fmt"
"strings"
)

type streamIDMode int

const (
streamIDModeRead streamIDMode = iota
streamIDModePublish
)

type streamID struct {
mode streamIDMode
path string
query string
user string
pass string
}

func (s *streamID) unmarshal(raw string) error {
// standard syntax
// https://github.com/Haivision/srt/blob/master/docs/features/access-control.md
if strings.HasPrefix(raw, "#!::") {
for _, kv := range strings.Split(raw[len("#!::"):], ",") {
kv2 := strings.SplitN(kv, "=", 2)
if len(kv2) != 2 {
return fmt.Errorf("invalid value")
}

Check warning on line 31 in internal/servers/srt/streamid.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/streamid.go#L30-L31

Added lines #L30 - L31 were not covered by tests

key, value := kv2[0], kv2[1]

switch key {
case "u":
s.user = value

case "r":
s.path = value

case "h":

case "s":
s.pass = value

case "t":

case "m":
switch value {
case "request":
s.mode = streamIDModeRead

Check warning on line 52 in internal/servers/srt/streamid.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/streamid.go#L51-L52

Added lines #L51 - L52 were not covered by tests

case "publish":
s.mode = streamIDModePublish

default:
return fmt.Errorf("unsupported mode '%s'", value)

Check warning on line 58 in internal/servers/srt/streamid.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/streamid.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}

default:
return fmt.Errorf("unsupported key '%s'", key)

Check warning on line 62 in internal/servers/srt/streamid.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/streamid.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}
}
} else {
parts := strings.Split(raw, ":")
if len(parts) < 2 || len(parts) > 5 {
return fmt.Errorf("stream ID must be 'action:pathname[:query]' or 'action:pathname:user:pass[:query]', " +
"where action is either read or publish, pathname is the path name, user and pass are the credentials, " +
"query is an optional token containing additional information")
}

Check warning on line 71 in internal/servers/srt/streamid.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/streamid.go#L68-L71

Added lines #L68 - L71 were not covered by tests

switch parts[0] {
case "read":
s.mode = streamIDModeRead

case "publish":
s.mode = streamIDModePublish

default:
return fmt.Errorf("stream ID must be 'action:pathname[:query]' or 'action:pathname:user:pass[:query]', " +
"where action is either read or publish, pathname is the path name, user and pass are the credentials, " +
"query is an optional token containing additional information")

Check warning on line 83 in internal/servers/srt/streamid.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/srt/streamid.go#L80-L83

Added lines #L80 - L83 were not covered by tests
}

s.path = parts[1]

if len(parts) == 4 || len(parts) == 5 {
s.user, s.pass = parts[2], parts[3]
}

if len(parts) == 3 {
s.query = parts[2]
} else if len(parts) == 5 {
s.query = parts[4]
}
}

return nil
}
61 changes: 61 additions & 0 deletions internal/servers/srt/streamid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package srt

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestStreamIDUnmarshal(t *testing.T) {
for _, ca := range []struct {
name string
raw string
dec streamID
}{
{
"mediamtx syntax 1",
"read:mypath",
streamID{
mode: streamIDModeRead,
path: "mypath",
},
},
{
"mediamtx syntax 2",
"publish:mypath:myquery",
streamID{
mode: streamIDModePublish,
path: "mypath",
query: "myquery",
},
},
{
"mediamtx syntax 3",
"read:mypath:myuser:mypass:myquery",
streamID{
mode: streamIDModeRead,
path: "mypath",
user: "myuser",
pass: "mypass",
query: "myquery",
},
},
{
"standard syntax",
"#!::u=johnny,t=file,m=publish,r=results.csv,s=mypass,h=myhost.com",
streamID{
mode: streamIDModePublish,
path: "results.csv",
user: "johnny",
pass: "mypass",
},
},
} {
t.Run(ca.name, func(t *testing.T) {
var streamID streamID
err := streamID.unmarshal(ca.raw)
require.NoError(t, err)
require.Equal(t, ca.dec, streamID)
})
}
}
Loading