Skip to content

Commit

Permalink
server: add ServerSession.SetuppedStream, ServerSession.SetuppedPath,…
Browse files Browse the repository at this point in the history
… ServerSession.SetuppedQuery
  • Loading branch information
aler9 committed Oct 17, 2023
1 parent 90dec27 commit c1fa13e
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 13 deletions.
19 changes: 18 additions & 1 deletion server_play_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func absoluteControlAttribute(md *psdp.MediaDescription) string {
func doDescribe(t *testing.T, conn *conn.Conn) *sdp.SessionDescription {
res, err := writeReqReadRes(conn, base.Request{
Method: base.Describe,
URL: mustParseURL("rtsp://localhost:8554/teststream"),
URL: mustParseURL("rtsp://localhost:8554/teststream?param=value"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
},
Expand Down Expand Up @@ -553,6 +553,23 @@ func TestServerPlay(t *testing.T) {
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
switch transport {
case "udp":
v := TransportUDP
require.Equal(t, &v, ctx.Session.SetuppedTransport())

case "tcp", "tls":
v := TransportTCP
require.Equal(t, &v, ctx.Session.SetuppedTransport())

case "multicast":
v := TransportUDPMulticast
require.Equal(t, &v, ctx.Session.SetuppedTransport())
}

require.Equal(t, "param=value", ctx.Session.SetuppedQuery())
require.Equal(t, stream.Description().Medias, ctx.Session.SetuppedMedias())

// send RTCP packets directly to the session.
// these are sent after the response, only if onPlay returns StatusOK.
if transport != "multicast" {
Expand Down
17 changes: 15 additions & 2 deletions server_record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,19 @@ func TestServerRecord(t *testing.T) {
}, nil, nil
},
onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) {
switch transport {
case "udp":
v := TransportUDP
require.Equal(t, &v, ctx.Session.SetuppedTransport())

case "tcp", "tls":
v := TransportTCP
require.Equal(t, &v, ctx.Session.SetuppedTransport())
}

require.Equal(t, "param=value", ctx.Session.SetuppedQuery())
require.Equal(t, ctx.Session.AnnouncedDescription().Medias, ctx.Session.SetuppedMedias())

// queue sending of RTCP packets.
// these are sent after the response, only if onRecord returns StatusOK.
err := ctx.Session.WritePacketRTCP(ctx.Session.AnnouncedDescription().Medias[0], &testRTCPPacket)
Expand Down Expand Up @@ -602,7 +615,7 @@ func TestServerRecord(t *testing.T) {
},
}

doAnnounce(t, conn, "rtsp://localhost:8554/teststream", medias)
doAnnounce(t, conn, "rtsp://localhost:8554/teststream?param=value", medias)

<-sessionOpened

Expand Down Expand Up @@ -639,7 +652,7 @@ func TestServerRecord(t *testing.T) {
inTH.InterleavedIDs = &[2]int{2 + i*2, 3 + i*2}
}

res, th := doSetup(t, conn, "rtsp://localhost:8554/teststream/"+medias[i].Control, inTH, "")
res, th := doSetup(t, conn, "rtsp://localhost:8554/teststream?param=value/"+medias[i].Control, inTH, "")

session = readSession(t, res)

Expand Down
36 changes: 26 additions & 10 deletions server_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ type ServerSession struct {
tcpCallbackByChannel map[int]readFunc
setuppedTransport *Transport
setuppedStream *ServerStream // read
setuppedPath *string
setuppedPath string
setuppedQuery string
lastRequestTime time.Time
tcpConn *ServerConn
Expand Down Expand Up @@ -266,6 +266,21 @@ func (ss *ServerSession) SetuppedTransport() *Transport {
return ss.setuppedTransport
}

// SetuppedStream returns the stream associated with the session.
func (ss *ServerSession) SetuppedStream() *ServerStream {
return ss.setuppedStream

Check warning on line 271 in server_session.go

View check run for this annotation

Codecov / codecov/patch

server_session.go#L270-L271

Added lines #L270 - L271 were not covered by tests
}

// SetuppedPath returns the path sent during SETUP or ANNOUNCE.
func (ss *ServerSession) SetuppedPath() string {
return ss.setuppedPath

Check warning on line 276 in server_session.go

View check run for this annotation

Codecov / codecov/patch

server_session.go#L275-L276

Added lines #L275 - L276 were not covered by tests
}

// SetuppedQuery returns the query sent during SETUP or ANNOUNCE.
func (ss *ServerSession) SetuppedQuery() string {
return ss.setuppedQuery
}

// AnnouncedDescription returns the announced stream description.
func (ss *ServerSession) AnnouncedDescription() *description.Session {
return ss.announcedDesc
Expand Down Expand Up @@ -624,7 +639,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
}

ss.state = ServerSessionStatePreRecord
ss.setuppedPath = &path
ss.setuppedPath = path
ss.setuppedQuery = query
ss.announcedDesc = &desc

Expand Down Expand Up @@ -670,14 +685,14 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
}, err
}

if ss.setuppedPath != nil && path != *ss.setuppedPath {
if ss.state == ServerSessionStatePrePlay && path != ss.setuppedPath {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, liberrors.ErrServerMediasDifferentPaths{}
}

default: // record
path = *ss.setuppedPath
path = ss.setuppedPath
query = ss.setuppedQuery
}

Expand Down Expand Up @@ -799,7 +814,8 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
}

ss.state = ServerSessionStatePrePlay
ss.setuppedPath = &path
ss.setuppedPath = path
ss.setuppedQuery = query
ss.setuppedStream = stream
}

Expand Down Expand Up @@ -886,10 +902,10 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
}, err
}

if ss.State() == ServerSessionStatePrePlay && path != *ss.setuppedPath {
if ss.State() == ServerSessionStatePrePlay && path != ss.setuppedPath {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, liberrors.ErrServerPathHasChanged{Prev: *ss.setuppedPath, Cur: path}
}, liberrors.ErrServerPathHasChanged{Prev: ss.setuppedPath, Cur: path}

Check warning on line 908 in server_session.go

View check run for this annotation

Codecov / codecov/patch

server_session.go#L908

Added line #L908 was not covered by tests
}

// allocate writeBuffer before calling OnPlay().
Expand Down Expand Up @@ -950,7 +966,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
ss.s.timeNow(),
ss.setuppedMediasOrdered,
ss.setuppedStream,
*ss.setuppedPath,
ss.setuppedPath,
req.URL)

if ok {
Expand Down Expand Up @@ -978,10 +994,10 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
}, liberrors.ErrServerNotAllAnnouncedMediasSetup{}
}

if path != *ss.setuppedPath {
if path != ss.setuppedPath {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, liberrors.ErrServerPathHasChanged{Prev: *ss.setuppedPath, Cur: path}
}, liberrors.ErrServerPathHasChanged{Prev: ss.setuppedPath, Cur: path}

Check warning on line 1000 in server_session.go

View check run for this annotation

Codecov / codecov/patch

server_session.go#L1000

Added line #L1000 was not covered by tests
}

// allocate writeBuffer before calling OnRecord().
Expand Down

0 comments on commit c1fa13e

Please sign in to comment.