Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: add 'query' field to RTMP, RTSP, SRT and WebRTC clients (#2689) #2844

Merged
merged 1 commit into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apidocs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ components:
enum: [idle, read, publish]
path:
type: string
query:
type: string
bytesReceived:
type: integer
format: int64
Expand Down Expand Up @@ -544,6 +546,8 @@ components:
enum: [idle, read, publish]
path:
type: string
query:
type: string
transport:
type: string
nullable: true
Expand Down Expand Up @@ -578,6 +582,8 @@ components:
enum: [idle, read, publish]
path:
type: string
query:
type: string
bytesReceived:
type: integer
format: int64
Expand Down Expand Up @@ -615,6 +621,8 @@ components:
enum: [read, publish]
path:
type: string
query:
type: string
bytesReceived:
type: integer
format: int64
Expand Down
14 changes: 9 additions & 5 deletions internal/core/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func TestAPIProtocolList(t *testing.T) {
case "rtsp conns", "rtsp sessions":
source := gortsplib.Client{}

err := source.StartRecording("rtsp://localhost:8554/mypath",
err := source.StartRecording("rtsp://localhost:8554/mypath?key=val",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
Expand All @@ -698,7 +698,7 @@ func TestAPIProtocolList(t *testing.T) {
TLSConfig: &tls.Config{InsecureSkipVerify: true},
}

err := source.StartRecording("rtsps://localhost:8322/mypath",
err := source.StartRecording("rtsps://localhost:8322/mypath?key=val",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
Expand All @@ -711,7 +711,7 @@ func TestAPIProtocolList(t *testing.T) {
port = "1936"
}

u, err := url.Parse("rtmp://127.0.0.1:" + port + "/mypath")
u, err := url.Parse("rtmp://127.0.0.1:" + port + "/mypath?key=val")
require.NoError(t, err)

nconn, err := func() (net.Conn, error) {
Expand Down Expand Up @@ -795,7 +795,7 @@ func TestAPIProtocolList(t *testing.T) {
require.NoError(t, err)
defer source.Close()

u, err := url.Parse("http://localhost:8889/mypath/whep")
u, err := url.Parse("http://localhost:8889/mypath/whep?key=val")
require.NoError(t, err)

go func() {
Expand Down Expand Up @@ -826,7 +826,7 @@ func TestAPIProtocolList(t *testing.T) {

case "srt":
conf := srt.DefaultConfig()
conf.StreamId = "publish:mypath"
conf.StreamId = "publish:mypath:::key=val"

conn, err := srt.Dial("srt", "localhost:8890", conf)
require.NoError(t, err)
Expand Down Expand Up @@ -878,6 +878,7 @@ func TestAPIProtocolList(t *testing.T) {
type item struct {
State string `json:"state"`
Path string `json:"path"`
Query string `json:"query"`
}

var out struct {
Expand All @@ -890,6 +891,7 @@ func TestAPIProtocolList(t *testing.T) {
require.Equal(t, item{
State: "publish",
Path: "mypath",
Query: "key=val",
}, out.Items[0])
}

Expand All @@ -914,6 +916,7 @@ func TestAPIProtocolList(t *testing.T) {
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
State string `json:"state"`
Path string `json:"path"`
Query string `json:"query"`
}

var out struct {
Expand All @@ -926,6 +929,7 @@ func TestAPIProtocolList(t *testing.T) {
PeerConnectionEstablished: true,
State: "read",
Path: "mypath",
Query: "key=val",
}, out.Items[0])
}
})
Expand Down
36 changes: 20 additions & 16 deletions internal/defs/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,6 @@ type APIHLSMuxerList struct {
Items []*APIHLSMuxer `json:"items"`
}

// APIRTSPConn is a RTSP connection.
type APIRTSPConn struct {
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}

// APIRTSPConnsList is a list of RTSP connections.
type APIRTSPConnsList struct {
ItemCount int `json:"itemCount"`
PageCount int `json:"pageCount"`
Items []*APIRTSPConn `json:"items"`
}

// APIRTMPConnState is the state of a RTMP connection.
type APIRTMPConnState string

Expand All @@ -94,6 +78,7 @@ type APIRTMPConn struct {
RemoteAddr string `json:"remoteAddr"`
State APIRTMPConnState `json:"state"`
Path string `json:"path"`
Query string `json:"query"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
Expand All @@ -105,6 +90,22 @@ type APIRTMPConnList struct {
Items []*APIRTMPConn `json:"items"`
}

// APIRTSPConn is a RTSP connection.
type APIRTSPConn struct {
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}

// APIRTSPConnsList is a list of RTSP connections.
type APIRTSPConnsList struct {
ItemCount int `json:"itemCount"`
PageCount int `json:"pageCount"`
Items []*APIRTSPConn `json:"items"`
}

// APIRTSPSessionState is the state of a RTSP session.
type APIRTSPSessionState string

Expand All @@ -122,6 +123,7 @@ type APIRTSPSession struct {
RemoteAddr string `json:"remoteAddr"`
State APIRTSPSessionState `json:"state"`
Path string `json:"path"`
Query string `json:"query"`
Transport *string `json:"transport"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
Expand Down Expand Up @@ -151,6 +153,7 @@ type APISRTConn struct {
RemoteAddr string `json:"remoteAddr"`
State APISRTConnState `json:"state"`
Path string `json:"path"`
Query string `json:"query"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
Expand Down Expand Up @@ -181,6 +184,7 @@ type APIWebRTCSession struct {
RemoteCandidate string `json:"remoteCandidate"`
State APIWebRTCSessionState `json:"state"`
Path string `json:"path"`
Query string `json:"query"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
Expand Down
4 changes: 4 additions & 0 deletions internal/servers/rtmp/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type conn struct {
rconn *rtmp.Conn
state connState
pathName string
query string
}

func (c *conn) initialize() {
Expand Down Expand Up @@ -191,6 +192,7 @@ func (c *conn) runRead(conn *rtmp.Conn, u *url.URL) error {
c.mutex.Lock()
c.state = connStateRead
c.pathName = pathName
c.query = rawQuery
c.mutex.Unlock()

writer := asyncwriter.New(c.writeQueueSize, c)
Expand Down Expand Up @@ -421,6 +423,7 @@ func (c *conn) runPublish(conn *rtmp.Conn, u *url.URL) error {
c.mutex.Lock()
c.state = connStatePublish
c.pathName = pathName
c.query = rawQuery
c.mutex.Unlock()

r, err := rtmp.NewReader(conn)
Expand Down Expand Up @@ -594,6 +597,7 @@ func (c *conn) apiItem() *defs.APIRTMPConn {
}
}(),
Path: c.pathName,
Query: c.query,
BytesReceived: bytesReceived,
BytesSent: bytesSent,
}
Expand Down
6 changes: 5 additions & 1 deletion internal/servers/rtsp/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type session struct {
state gortsplib.ServerSessionState
transport *gortsplib.Transport
pathName string
query string
decodeErrLogger logger.Writer
writeErrLogger logger.Writer
}
Expand Down Expand Up @@ -140,6 +141,7 @@ func (s *session) onAnnounce(c *conn, ctx *gortsplib.ServerHandlerOnAnnounceCtx)
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePreRecord
s.pathName = ctx.Path
s.query = ctx.Query
s.mutex.Unlock()

return &base.Response{
Expand Down Expand Up @@ -232,6 +234,7 @@ func (s *session) onSetup(c *conn, ctx *gortsplib.ServerHandlerOnSetupCtx,
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePrePlay
s.pathName = ctx.Path
s.query = ctx.Query
s.mutex.Unlock()

var stream *gortsplib.ServerStream
Expand Down Expand Up @@ -400,7 +403,8 @@ func (s *session) apiItem() *defs.APIRTSPSession {
}
return defs.APIRTSPSessionStateIdle
}(),
Path: s.pathName,
Path: s.pathName,
Query: s.query,
Transport: func() *string {
if s.transport == nil {
return nil
Expand Down
4 changes: 4 additions & 0 deletions internal/servers/srt/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type conn struct {
mutex sync.RWMutex
state connState
pathName string
query string
sconn srt.Conn

chNew chan srtNewConnReq
Expand Down Expand Up @@ -218,6 +219,7 @@ func (c *conn) runPublish(req srtNewConnReq, pathName string, user string, pass
c.mutex.Lock()
c.state = connStatePublish
c.pathName = pathName
c.query = query
c.sconn = sconn
c.mutex.Unlock()

Expand Down Expand Up @@ -317,6 +319,7 @@ func (c *conn) runRead(req srtNewConnReq, pathName string, user string, pass str
c.mutex.Lock()
c.state = connStateRead
c.pathName = pathName
c.query = query
c.sconn = sconn
c.mutex.Unlock()

Expand Down Expand Up @@ -434,6 +437,7 @@ func (c *conn) apiItem() *defs.APISRTConn {
}
}(),
Path: c.pathName,
Query: c.query,
BytesReceived: bytesReceived,
BytesSent: bytesSent,
}
Expand Down
1 change: 1 addition & 0 deletions internal/servers/webrtc/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ func (s *session) apiItem() *defs.APIWebRTCSession {
return defs.APIWebRTCSessionStateRead
}(),
Path: s.req.pathName,
Query: s.req.query,
BytesReceived: bytesReceived,
BytesSent: bytesSent,
}
Expand Down
Loading