Skip to content

Commit

Permalink
use new time decoder in place of deprecated one (#643)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 authored Nov 4, 2024
1 parent 9ac016c commit 01fd803
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 64 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ Features:

## RTP Payload Formats

In RTSP, media streams are routed between server and clients by using RTP packets, which are encoded in a specific, codec-dependent, format. This library supports formats for the following codecs:
In RTSP, media streams are transmitted by using RTP packets, which are encoded in a specific, codec-dependent, format. This library supports formats for the following codecs:

### Video

Expand Down
25 changes: 18 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/sdp"
)

// avoid an int64 overflow and preserve resolution by splitting division into two parts:
// first add the integer part, then the decimal part.
func multiplyAndDivide(v, m, d time.Duration) time.Duration {
secs := v / d
dec := v % d
return (secs*m + dec*m/d)
}

// convert an URL into an address, in particular:
// * add default port
// * handle IPv6 with or without square brackets.
Expand Down Expand Up @@ -329,8 +337,7 @@ type Client struct {
closeError error
writer asyncProcessor
reader *clientReader
timeDecoder *rtptime.GlobalDecoder
timeDecoder2 *rtptime.GlobalDecoder2
timeDecoder *rtptime.GlobalDecoder2
mustClose bool

// in
Expand Down Expand Up @@ -812,8 +819,7 @@ func (c *Client) startReadRoutines() {
c.writer.allocateBuffer(8)
}

c.timeDecoder = rtptime.NewGlobalDecoder()
c.timeDecoder2 = rtptime.NewGlobalDecoder2()
c.timeDecoder = rtptime.NewGlobalDecoder2()

for _, cm := range c.medias {
cm.start()
Expand Down Expand Up @@ -855,7 +861,6 @@ func (c *Client) stopReadRoutines() {
}

c.timeDecoder = nil
c.timeDecoder2 = nil
}

func (c *Client) startWriter() {
Expand Down Expand Up @@ -1900,15 +1905,21 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error
func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) {
cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return c.timeDecoder.Decode(ct.format, pkt)

v, ok := c.timeDecoder.Decode(ct.format, pkt)
if !ok {
return 0, false
}

return multiplyAndDivide(time.Duration(v), time.Second, time.Duration(ct.format.ClockRate())), true
}

// PacketPTS returns the PTS of an incoming RTP packet.
// It is computed by decoding the packet timestamp and sychronizing it with other tracks.
func (c *Client) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bool) {
cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return c.timeDecoder2.Decode(ct.format, pkt)
return c.timeDecoder.Decode(ct.format, pkt)
}

// PacketNTP returns the NTP timestamp of an incoming RTP packet.
Expand Down
54 changes: 7 additions & 47 deletions pkg/rtptime/global_decoder.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rtptime

import (
"sync"
"time"

"github.com/pion/rtp"
Expand Down Expand Up @@ -45,6 +44,8 @@ func (d *globalDecoderTrackData) decode(ts uint32) time.Duration {
}

// GlobalDecoderTrack is a track (RTSP format or WebRTC track) of a GlobalDecoder.
//
// Deprecated: replaced by GlobalDecoderTrack2
type GlobalDecoderTrack interface {
ClockRate() int
PTSEqualsDTS(*rtp.Packet) bool
Expand All @@ -54,19 +55,15 @@ type GlobalDecoderTrack interface {
//
// Deprecated: replaced by GlobalDecoder2.
type GlobalDecoder struct {
mutex sync.Mutex
leadingTrack GlobalDecoderTrack
startNTP time.Time
startPTS time.Duration
tracks map[GlobalDecoderTrack]*globalDecoderTrackData
wrapped *GlobalDecoder2
}

// NewGlobalDecoder allocates a GlobalDecoder.
//
// Deprecated: replaced by NewGlobalDecoder2.
func NewGlobalDecoder() *GlobalDecoder {
return &GlobalDecoder{
tracks: make(map[GlobalDecoderTrack]*globalDecoderTrackData),
wrapped: NewGlobalDecoder2(),
}
}

Expand All @@ -75,47 +72,10 @@ func (d *GlobalDecoder) Decode(
track GlobalDecoderTrack,
pkt *rtp.Packet,
) (time.Duration, bool) {
if track.ClockRate() == 0 {
return 0, false
}

d.mutex.Lock()
defer d.mutex.Unlock()

df, ok := d.tracks[track]

// track never seen before
v, ok := d.wrapped.Decode(track, pkt)
if !ok {
if !track.PTSEqualsDTS(pkt) {
return 0, false
}

now := timeNow()

if d.leadingTrack == nil {
d.leadingTrack = track
d.startNTP = now
d.startPTS = 0
}

df = newGlobalDecoderTrackData(
d.startPTS+now.Sub(d.startNTP),
track.ClockRate(),
pkt.Timestamp)

d.tracks[track] = df

return df.startPTS, true
}

pts := df.decode(pkt.Timestamp)

// update startNTP / startPTS
if d.leadingTrack == track && track.PTSEqualsDTS(pkt) {
now := timeNow()
d.startNTP = now
d.startPTS = pts
return 0, false
}

return pts, true
return multiplyAndDivide(time.Duration(v), time.Second, time.Duration(track.ClockRate())), true
}
20 changes: 11 additions & 9 deletions server_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ type ServerSession struct {
udpLastPacketTime *int64 // publish
udpCheckStreamTimer *time.Timer
writer asyncProcessor
timeDecoder *rtptime.GlobalDecoder
timeDecoder2 *rtptime.GlobalDecoder2
timeDecoder *rtptime.GlobalDecoder2

// in
chHandleRequest chan sessionRequestReq
Expand Down Expand Up @@ -952,8 +951,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
v := ss.s.timeNow().Unix()
ss.udpLastPacketTime = &v

ss.timeDecoder = rtptime.NewGlobalDecoder()
ss.timeDecoder2 = rtptime.NewGlobalDecoder2()
ss.timeDecoder = rtptime.NewGlobalDecoder2()

for _, sm := range ss.setuppedMedias {
sm.start()
Expand Down Expand Up @@ -1039,8 +1037,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
v := ss.s.timeNow().Unix()
ss.udpLastPacketTime = &v

ss.timeDecoder = rtptime.NewGlobalDecoder()
ss.timeDecoder2 = rtptime.NewGlobalDecoder2()
ss.timeDecoder = rtptime.NewGlobalDecoder2()

for _, sm := range ss.setuppedMedias {
sm.start()
Expand Down Expand Up @@ -1095,7 +1092,6 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
}

ss.timeDecoder = nil
ss.timeDecoder2 = nil

switch ss.state {
case ServerSessionStatePlay:
Expand Down Expand Up @@ -1268,15 +1264,21 @@ func (ss *ServerSession) WritePacketRTCP(medi *description.Media, pkt rtcp.Packe
func (ss *ServerSession) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) {
sm := ss.setuppedMedias[medi]
sf := sm.formats[pkt.PayloadType]
return ss.timeDecoder.Decode(sf.format, pkt)

v, ok := ss.timeDecoder.Decode(sf.format, pkt)
if !ok {
return 0, false
}

return multiplyAndDivide(time.Duration(v), time.Second, time.Duration(sf.format.ClockRate())), true
}

// PacketPTS2 returns the PTS of an incoming RTP packet.
// It is computed by decoding the packet timestamp and sychronizing it with other tracks.
func (ss *ServerSession) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bool) {
sm := ss.setuppedMedias[medi]
sf := sm.formats[pkt.PayloadType]
return ss.timeDecoder2.Decode(sf.format, pkt)
return ss.timeDecoder.Decode(sf.format, pkt)
}

// PacketNTP returns the NTP timestamp of an incoming RTP packet.
Expand Down

0 comments on commit 01fd803

Please sign in to comment.