From e7c7121d53d0b59046e4eb3d1e7894db1e003487 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 30 Oct 2023 11:38:42 +0100 Subject: [PATCH] rtmp: fix publishing screen with iOS StreamLabs (#2352) --- internal/protocols/rtmp/reader.go | 134 +++++++++++++------------ internal/protocols/rtmp/reader_test.go | 83 +++++++++++++++ 2 files changed, 155 insertions(+), 62 deletions(-) diff --git a/internal/protocols/rtmp/reader.go b/internal/protocols/rtmp/reader.go index d6d3d7466e8..7ccc741dde2 100644 --- a/internal/protocols/rtmp/reader.go +++ b/internal/protocols/rtmp/reader.go @@ -17,6 +17,10 @@ import ( "github.com/bluenviron/mediamtx/internal/protocols/rtmp/message" ) +const ( + analyzePeriod = 1 * time.Second +) + // OnDataAV1Func is the prototype of the callback passed to OnDataAV1(). type OnDataAV1Func func(pts time.Duration, tu [][]byte) @@ -161,6 +165,9 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma return nil, nil, fmt.Errorf("metadata doesn't contain any track") } + firstReceived := false + var startTime time.Duration + for { if (!hasVideo || videoTrack != nil) && (!hasAudio || audioTrack != nil) { @@ -172,22 +179,27 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma return nil, nil, err } - switch tmsg := msg.(type) { + switch msg := msg.(type) { case *message.Video: if !hasVideo { return nil, nil, fmt.Errorf("unexpected video packet") } + if !firstReceived { + firstReceived = true + startTime = msg.DTS + } + if videoTrack == nil { - if tmsg.Type == message.VideoTypeConfig { - videoTrack, err = trackFromH264DecoderConfig(tmsg.Payload) + if msg.Type == message.VideoTypeConfig { + videoTrack, err = trackFromH264DecoderConfig(msg.Payload) if err != nil { return nil, nil, err } // format used by OBS < 29.1 to publish H265 - } else if tmsg.Type == message.VideoTypeAU && tmsg.IsKeyFrame { - nalus, err := h264.AVCCUnmarshal(tmsg.Payload) + } else if msg.Type == message.VideoTypeAU && msg.IsKeyFrame { + nalus, err := h264.AVCCUnmarshal(msg.Payload) if err != nil { if err == h264.ErrAVCCNoNALUs { continue @@ -225,12 +237,17 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma } } + // video was found, but audio was not + if videoTrack != nil && (msg.DTS-startTime) >= analyzePeriod { + return videoTrack, nil, nil + } + case *message.ExtendedSequenceStart: if videoTrack == nil { - switch tmsg.FourCC { + switch msg.FourCC { case message.FourCCHEVC: var hvcc mp4.HvcC - _, err := mp4.Unmarshal(bytes.NewReader(tmsg.Config), uint64(len(tmsg.Config)), &hvcc, mp4.Context{}) + _, err := mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &hvcc, mp4.Context{}) if err != nil { return nil, nil, fmt.Errorf("invalid H265 configuration: %v", err) } @@ -251,7 +268,7 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma case message.FourCCAV1: var av1c mp4.Av1C - _, err := mp4.Unmarshal(bytes.NewReader(tmsg.Config), uint64(len(tmsg.Config)), &av1c, mp4.Context{}) + _, err := mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &av1c, mp4.Context{}) if err != nil { return nil, nil, fmt.Errorf("invalid AV1 configuration: %v", err) } @@ -268,7 +285,7 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma default: // VP9 var vpcc mp4.VpcC - _, err := mp4.Unmarshal(bytes.NewReader(tmsg.Config), uint64(len(tmsg.Config)), &vpcc, mp4.Context{}) + _, err := mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &vpcc, mp4.Context{}) if err != nil { return nil, nil, fmt.Errorf("invalid VP9 configuration: %v", err) } @@ -285,9 +302,9 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma } if audioTrack == nil && - tmsg.Codec == message.CodecMPEG4Audio && - tmsg.AACType == message.AudioAACTypeConfig { - audioTrack, err = trackFromAACDecoderConfig(tmsg.Payload) + msg.Codec == message.CodecMPEG4Audio && + msg.AACType == message.AudioAACTypeConfig { + audioTrack, err = trackFromAACDecoderConfig(msg.Payload) if err != nil { return nil, nil, err } @@ -297,24 +314,24 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma } func tracksFromMessages(conn *Conn, msg message.Message) (format.Format, format.Format, error) { - var startTime *time.Duration + firstReceived := false + var startTime time.Duration var videoTrack format.Format var audioTrack format.Format - // analyze 1 second of packets outer: for { - switch tmsg := msg.(type) { + switch msg := msg.(type) { case *message.Video: - if startTime == nil { - v := tmsg.DTS - startTime = &v + if !firstReceived { + firstReceived = true + startTime = msg.DTS } - if tmsg.Type == message.VideoTypeConfig { + if msg.Type == message.VideoTypeConfig { if videoTrack == nil { var err error - videoTrack, err = trackFromH264DecoderConfig(tmsg.Payload) + videoTrack, err = trackFromH264DecoderConfig(msg.Payload) if err != nil { return nil, nil, err } @@ -326,20 +343,20 @@ outer: } } - if (tmsg.DTS - *startTime) >= 1*time.Second { + if (msg.DTS - startTime) >= analyzePeriod { break outer } case *message.Audio: - if startTime == nil { - v := tmsg.DTS - startTime = &v + if !firstReceived { + firstReceived = true + startTime = msg.DTS } - if tmsg.AACType == message.AudioAACTypeConfig { + if msg.AACType == message.AudioAACTypeConfig { if audioTrack == nil { var err error - audioTrack, err = trackFromAACDecoderConfig(tmsg.Payload) + audioTrack, err = trackFromAACDecoderConfig(msg.Payload) if err != nil { return nil, nil, err } @@ -351,7 +368,7 @@ outer: } } - if (tmsg.DTS - *startTime) >= 1*time.Second { + if (msg.DTS - startTime) >= analyzePeriod { break outer } } @@ -395,52 +412,45 @@ func NewReader(conn *Conn) (*Reader, error) { } func (r *Reader) readTracks() (format.Format, format.Format, error) { - msg, err := func() (message.Message, error) { - for { - msg, err := r.conn.Read() - if err != nil { - return nil, err - } + for { + msg, err := r.conn.Read() + if err != nil { + return nil, nil, err + } - // skip play start and data start - if cmd, ok := msg.(*message.CommandAMF0); ok && cmd.Name == "onStatus" { - continue - } + // skip play start and data start + if cmd, ok := msg.(*message.CommandAMF0); ok && cmd.Name == "onStatus" { + continue + } - // skip RtmpSampleAccess - if data, ok := msg.(*message.DataAMF0); ok && len(data.Payload) >= 1 { - if s, ok := data.Payload[0].(string); ok && s == "|RtmpSampleAccess" { - continue - } + // skip RtmpSampleAccess + if data, ok := msg.(*message.DataAMF0); ok && len(data.Payload) >= 1 { + if s, ok := data.Payload[0].(string); ok && s == "|RtmpSampleAccess" { + continue } - - return msg, nil } - }() - if err != nil { - return nil, nil, err - } - if data, ok := msg.(*message.DataAMF0); ok && len(data.Payload) >= 1 { - payload := data.Payload + if data, ok := msg.(*message.DataAMF0); ok && len(data.Payload) >= 1 { + payload := data.Payload - if s, ok := payload[0].(string); ok && s == "@setDataFrame" { - payload = payload[1:] - } + if s, ok := payload[0].(string); ok && s == "@setDataFrame" { + payload = payload[1:] + } - if len(payload) >= 1 { - if s, ok := payload[0].(string); ok && s == "onMetaData" { - videoTrack, audioTrack, err := tracksFromMetadata(r.conn, payload[1:]) - if err != nil { - return nil, nil, err - } + if len(payload) >= 1 { + if s, ok := payload[0].(string); ok && s == "onMetaData" { + videoTrack, audioTrack, err := tracksFromMetadata(r.conn, payload[1:]) + if err != nil { + return nil, nil, err + } - return videoTrack, audioTrack, nil + return videoTrack, audioTrack, nil + } } } - } - return tracksFromMessages(r.conn, msg) + return tracksFromMessages(r.conn, msg) + } } // Tracks returns detected tracks diff --git a/internal/protocols/rtmp/reader_test.go b/internal/protocols/rtmp/reader_test.go index 029bde288cc..312327f20b5 100644 --- a/internal/protocols/rtmp/reader_test.go +++ b/internal/protocols/rtmp/reader_test.go @@ -679,6 +679,89 @@ func TestReadTracks(t *testing.T) { }, }, }, + { + "issue mediamtx/2352 (missing audio)", + &format.H264{ + PayloadTyp: 96, + SPS: h264SPS, + PPS: h264PPS, + PacketizationMode: 1, + }, + nil, + []message.Message{ + &message.DataAMF0{ + ChunkStreamID: 8, + MessageStreamID: 0x1000000, + Payload: []interface{}{ + "@setDataFrame", + "onMetaData", + flvio.AMFMap{ + { + K: "audiodatarate", + V: float64(128), + }, + { + K: "framerate", + V: float64(30), + }, + { + K: "videocodecid", + V: float64(7), + }, + { + K: "videodatarate", + V: float64(2500), + }, + { + K: "audiocodecid", + V: float64(10), + }, + { + K: "height", + V: float64(720), + }, + { + K: "width", + V: float64(1280), + }, + }, + }, + }, + &message.Video{ + ChunkStreamID: message.VideoChunkStreamID, + MessageStreamID: 0x1000000, + Codec: message.CodecH264, + IsKeyFrame: true, + Type: message.VideoTypeConfig, + Payload: func() []byte { + buf, _ := h264conf.Conf{ + SPS: h264SPS, + PPS: h264PPS, + }.Marshal() + return buf + }(), + }, + &message.Video{ + ChunkStreamID: message.VideoChunkStreamID, + MessageStreamID: 0x1000000, + Codec: 0x7, + IsKeyFrame: true, + Payload: []uint8{ + 5, + }, + }, + &message.Video{ + ChunkStreamID: message.VideoChunkStreamID, + MessageStreamID: 0x1000000, + Codec: 0x7, + IsKeyFrame: true, + DTS: 2 * time.Second, + Payload: []uint8{ + 5, + }, + }, + }, + }, } { t.Run(ca.name, func(t *testing.T) { var buf bytes.Buffer