Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rtmp: fix publishing screen with iOS StreamLabs (#2352) #2611

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 72 additions & 62 deletions internal/protocols/rtmp/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
"github.com/bluenviron/mediamtx/internal/protocols/rtmp/message"
)

const (
analyzePeriod = 1 * time.Second
)

// OnDataAV1Func is the prototype of the callback passed to OnDataAV1().
type OnDataAV1Func func(pts time.Duration, tu [][]byte)

Expand Down Expand Up @@ -161,6 +165,9 @@
return nil, nil, fmt.Errorf("metadata doesn't contain any track")
}

firstReceived := false
var startTime time.Duration

for {
if (!hasVideo || videoTrack != nil) &&
(!hasAudio || audioTrack != nil) {
Expand All @@ -172,22 +179,27 @@
return nil, nil, err
}

switch tmsg := msg.(type) {
switch msg := msg.(type) {
case *message.Video:
if !hasVideo {
return nil, nil, fmt.Errorf("unexpected video packet")
}

if !firstReceived {
firstReceived = true
startTime = msg.DTS
}

if videoTrack == nil {
if tmsg.Type == message.VideoTypeConfig {
videoTrack, err = trackFromH264DecoderConfig(tmsg.Payload)
if msg.Type == message.VideoTypeConfig {
videoTrack, err = trackFromH264DecoderConfig(msg.Payload)
if err != nil {
return nil, nil, err
}

// format used by OBS < 29.1 to publish H265
} else if tmsg.Type == message.VideoTypeAU && tmsg.IsKeyFrame {
nalus, err := h264.AVCCUnmarshal(tmsg.Payload)
} else if msg.Type == message.VideoTypeAU && msg.IsKeyFrame {
nalus, err := h264.AVCCUnmarshal(msg.Payload)
if err != nil {
if err == h264.ErrAVCCNoNALUs {
continue
Expand Down Expand Up @@ -225,12 +237,17 @@
}
}

// video was found, but audio was not
if videoTrack != nil && (msg.DTS-startTime) >= analyzePeriod {
return videoTrack, nil, nil
}

case *message.ExtendedSequenceStart:
if videoTrack == nil {
switch tmsg.FourCC {
switch msg.FourCC {
case message.FourCCHEVC:
var hvcc mp4.HvcC
_, err := mp4.Unmarshal(bytes.NewReader(tmsg.Config), uint64(len(tmsg.Config)), &hvcc, mp4.Context{})
_, err := mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &hvcc, mp4.Context{})
if err != nil {
return nil, nil, fmt.Errorf("invalid H265 configuration: %v", err)
}
Expand All @@ -251,7 +268,7 @@

case message.FourCCAV1:
var av1c mp4.Av1C
_, err := mp4.Unmarshal(bytes.NewReader(tmsg.Config), uint64(len(tmsg.Config)), &av1c, mp4.Context{})
_, err := mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &av1c, mp4.Context{})
if err != nil {
return nil, nil, fmt.Errorf("invalid AV1 configuration: %v", err)
}
Expand All @@ -268,7 +285,7 @@

default: // VP9
var vpcc mp4.VpcC
_, err := mp4.Unmarshal(bytes.NewReader(tmsg.Config), uint64(len(tmsg.Config)), &vpcc, mp4.Context{})
_, err := mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &vpcc, mp4.Context{})

Check warning on line 288 in internal/protocols/rtmp/reader.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/rtmp/reader.go#L288

Added line #L288 was not covered by tests
if err != nil {
return nil, nil, fmt.Errorf("invalid VP9 configuration: %v", err)
}
Expand All @@ -285,9 +302,9 @@
}

if audioTrack == nil &&
tmsg.Codec == message.CodecMPEG4Audio &&
tmsg.AACType == message.AudioAACTypeConfig {
audioTrack, err = trackFromAACDecoderConfig(tmsg.Payload)
msg.Codec == message.CodecMPEG4Audio &&
msg.AACType == message.AudioAACTypeConfig {
audioTrack, err = trackFromAACDecoderConfig(msg.Payload)
if err != nil {
return nil, nil, err
}
Expand All @@ -297,24 +314,24 @@
}

func tracksFromMessages(conn *Conn, msg message.Message) (format.Format, format.Format, error) {
var startTime *time.Duration
firstReceived := false
var startTime time.Duration
var videoTrack format.Format
var audioTrack format.Format

// analyze 1 second of packets
outer:
for {
switch tmsg := msg.(type) {
switch msg := msg.(type) {
case *message.Video:
if startTime == nil {
v := tmsg.DTS
startTime = &v
if !firstReceived {
firstReceived = true
startTime = msg.DTS
}

if tmsg.Type == message.VideoTypeConfig {
if msg.Type == message.VideoTypeConfig {
if videoTrack == nil {
var err error
videoTrack, err = trackFromH264DecoderConfig(tmsg.Payload)
videoTrack, err = trackFromH264DecoderConfig(msg.Payload)
if err != nil {
return nil, nil, err
}
Expand All @@ -326,20 +343,20 @@
}
}

if (tmsg.DTS - *startTime) >= 1*time.Second {
if (msg.DTS - startTime) >= analyzePeriod {
break outer
}

case *message.Audio:
if startTime == nil {
v := tmsg.DTS
startTime = &v
if !firstReceived {
firstReceived = true
startTime = msg.DTS
}

if tmsg.AACType == message.AudioAACTypeConfig {
if msg.AACType == message.AudioAACTypeConfig {
if audioTrack == nil {
var err error
audioTrack, err = trackFromAACDecoderConfig(tmsg.Payload)
audioTrack, err = trackFromAACDecoderConfig(msg.Payload)
if err != nil {
return nil, nil, err
}
Expand All @@ -351,7 +368,7 @@
}
}

if (tmsg.DTS - *startTime) >= 1*time.Second {
if (msg.DTS - startTime) >= analyzePeriod {
break outer
}
}
Expand Down Expand Up @@ -395,52 +412,45 @@
}

func (r *Reader) readTracks() (format.Format, format.Format, error) {
msg, err := func() (message.Message, error) {
for {
msg, err := r.conn.Read()
if err != nil {
return nil, err
}
for {
msg, err := r.conn.Read()
if err != nil {
return nil, nil, err
}

Check warning on line 419 in internal/protocols/rtmp/reader.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/rtmp/reader.go#L418-L419

Added lines #L418 - L419 were not covered by tests

// skip play start and data start
if cmd, ok := msg.(*message.CommandAMF0); ok && cmd.Name == "onStatus" {
continue
}
// skip play start and data start
if cmd, ok := msg.(*message.CommandAMF0); ok && cmd.Name == "onStatus" {
continue

Check warning on line 423 in internal/protocols/rtmp/reader.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/rtmp/reader.go#L423

Added line #L423 was not covered by tests
}

// skip RtmpSampleAccess
if data, ok := msg.(*message.DataAMF0); ok && len(data.Payload) >= 1 {
if s, ok := data.Payload[0].(string); ok && s == "|RtmpSampleAccess" {
continue
}
// skip RtmpSampleAccess
if data, ok := msg.(*message.DataAMF0); ok && len(data.Payload) >= 1 {
if s, ok := data.Payload[0].(string); ok && s == "|RtmpSampleAccess" {
continue

Check warning on line 429 in internal/protocols/rtmp/reader.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/rtmp/reader.go#L429

Added line #L429 was not covered by tests
}

return msg, nil
}
}()
if err != nil {
return nil, nil, err
}

if data, ok := msg.(*message.DataAMF0); ok && len(data.Payload) >= 1 {
payload := data.Payload
if data, ok := msg.(*message.DataAMF0); ok && len(data.Payload) >= 1 {
payload := data.Payload

if s, ok := payload[0].(string); ok && s == "@setDataFrame" {
payload = payload[1:]
}
if s, ok := payload[0].(string); ok && s == "@setDataFrame" {
payload = payload[1:]
}

if len(payload) >= 1 {
if s, ok := payload[0].(string); ok && s == "onMetaData" {
videoTrack, audioTrack, err := tracksFromMetadata(r.conn, payload[1:])
if err != nil {
return nil, nil, err
}
if len(payload) >= 1 {
if s, ok := payload[0].(string); ok && s == "onMetaData" {
videoTrack, audioTrack, err := tracksFromMetadata(r.conn, payload[1:])
if err != nil {
return nil, nil, err
}

Check warning on line 445 in internal/protocols/rtmp/reader.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/rtmp/reader.go#L444-L445

Added lines #L444 - L445 were not covered by tests

return videoTrack, audioTrack, nil
return videoTrack, audioTrack, nil
}
}
}
}

return tracksFromMessages(r.conn, msg)
return tracksFromMessages(r.conn, msg)
}
}

// Tracks returns detected tracks
Expand Down
83 changes: 83 additions & 0 deletions internal/protocols/rtmp/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,89 @@ func TestReadTracks(t *testing.T) {
},
},
},
{
"issue mediamtx/2352 (missing audio)",
&format.H264{
PayloadTyp: 96,
SPS: h264SPS,
PPS: h264PPS,
PacketizationMode: 1,
},
nil,
[]message.Message{
&message.DataAMF0{
ChunkStreamID: 8,
MessageStreamID: 0x1000000,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{
K: "audiodatarate",
V: float64(128),
},
{
K: "framerate",
V: float64(30),
},
{
K: "videocodecid",
V: float64(7),
},
{
K: "videodatarate",
V: float64(2500),
},
{
K: "audiocodecid",
V: float64(10),
},
{
K: "height",
V: float64(720),
},
{
K: "width",
V: float64(1280),
},
},
},
},
&message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: true,
Type: message.VideoTypeConfig,
Payload: func() []byte {
buf, _ := h264conf.Conf{
SPS: h264SPS,
PPS: h264PPS,
}.Marshal()
return buf
}(),
},
&message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: 0x7,
IsKeyFrame: true,
Payload: []uint8{
5,
},
},
&message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: 0x7,
IsKeyFrame: true,
DTS: 2 * time.Second,
Payload: []uint8{
5,
},
},
},
},
} {
t.Run(ca.name, func(t *testing.T) {
var buf bytes.Buffer
Expand Down
Loading