From 27975d8b67f9030f0162d98c3bfccdb23163dbb3 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sun, 7 Jan 2024 17:02:22 +0100 Subject: [PATCH] rtmp: support publishing G711 and LPCM tracks (#2857) (#2884) --- README.md | 16 +- go.mod | 2 +- go.sum | 4 +- internal/formatprocessor/ac3.go | 5 +- internal/formatprocessor/av1.go | 5 +- internal/formatprocessor/av1_test.go | 1 - internal/formatprocessor/g711.go | 6 +- internal/formatprocessor/g711_test.go | 62 +++++++ internal/formatprocessor/h264.go | 10 +- internal/formatprocessor/h265.go | 10 +- internal/formatprocessor/lpcm.go | 8 +- internal/formatprocessor/lpcm_test.go | 37 ++++ internal/formatprocessor/mjpeg.go | 5 +- internal/formatprocessor/mpeg1_audio.go | 5 +- internal/formatprocessor/mpeg1_video.go | 5 +- internal/formatprocessor/mpeg4_audio.go | 5 +- internal/formatprocessor/mpeg4_video.go | 2 +- internal/formatprocessor/vp8.go | 5 +- internal/formatprocessor/vp9.go | 5 +- internal/protocols/rtmp/chunk/chunk_test.go | 28 +++ .../fuzz/FuzzChunk0Read/582528ddfad69eb5 | 2 + .../fuzz/FuzzChunk0Read/5f73a77c7f93e5f8 | 2 + .../fuzz/FuzzChunk1Read/553384c8664fe971 | 2 + .../fuzz/FuzzChunk1Read/582528ddfad69eb5 | 2 + .../fuzz/FuzzChunk2Read/582528ddfad69eb5 | 2 + .../fuzz/FuzzChunk2Read/feb2b2a8b4ba63ba | 2 + .../fuzz/FuzzChunk3Read/582528ddfad69eb5 | 2 + .../fuzz/FuzzChunk3Read/caf81e9797b19c76 | 2 + internal/protocols/rtmp/message/audio.go | 39 +++- .../protocols/rtmp/message/reader_test.go | 12 +- internal/protocols/rtmp/message/video.go | 8 +- internal/protocols/rtmp/reader.go | 109 +++++++++++- internal/protocols/rtmp/reader_test.go | 167 +++++++++++++++--- internal/protocols/rtmp/writer.go | 47 +++-- internal/protocols/rtmp/writer_test.go | 6 +- internal/servers/rtmp/conn.go | 22 +++ 36 files changed, 525 insertions(+), 127 deletions(-) delete mode 100644 internal/formatprocessor/av1_test.go create mode 100644 internal/formatprocessor/g711_test.go create mode 100644 internal/formatprocessor/lpcm_test.go create mode 100644 internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk0Read/582528ddfad69eb5 create mode 100644 internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk0Read/5f73a77c7f93e5f8 create mode 100644 internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk1Read/553384c8664fe971 create mode 100644 internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk1Read/582528ddfad69eb5 create mode 100644 internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk2Read/582528ddfad69eb5 create mode 100644 internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk2Read/feb2b2a8b4ba63ba create mode 100644 internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk3Read/582528ddfad69eb5 create mode 100644 internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk3Read/caf81e9797b19c76 diff --git a/README.md b/README.md index cccbe8ad2a7..45356d2bc4e 100644 --- a/README.md +++ b/README.md @@ -22,11 +22,11 @@ Live streams can be published to the server with: |--------|--------|------------|------------| |[SRT clients](#srt-clients)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| |[SRT cameras and servers](#srt-cameras-and-servers)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| -|[WebRTC clients](#webrtc-clients)|Browser-based, WHIP|AV1, VP9, VP8, H264|Opus, G722, G711| -|[WebRTC servers](#webrtc-servers)|WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| -|[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711, LPCM and any RTP-compatible codec| -|[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711, LPCM and any RTP-compatible codec| -|[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| +|[WebRTC clients](#webrtc-clients)|Browser-based, WHIP|AV1, VP9, VP8, H264|Opus, G722, G711 (PCMA, PCMU)| +|[WebRTC servers](#webrtc-servers)|WHEP|AV1, VP9, VP8, H264|Opus, G722, G711 (PCMA, PCMU)| +|[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711 (PCMA, PCMU), LPCM and any RTP-compatible codec| +|[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711 (PCMA, PCMU), LPCM and any RTP-compatible codec| +|[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G711 (PCMA, PCMU), LPCM| |[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[HLS cameras and servers](#hls-cameras-and-servers)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC)| |[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| @@ -37,8 +37,8 @@ And can be read from the server with: |protocol|variants|video codecs|audio codecs| |--------|--------|------------|------------| |[SRT](#srt)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| -|[WebRTC](#webrtc)|Browser-based, WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| -|[RTSP](#rtsp)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711, LPCM and any RTP-compatible codec| +|[WebRTC](#webrtc)|Browser-based, WHEP|AV1, VP9, VP8, H264|Opus, G722, G711 (PCMA, PCMU)| +|[RTSP](#rtsp)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711 (PCMA, PCMU), LPCM and any RTP-compatible codec| |[RTMP](#rtmp)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[HLS](#hls)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC)| @@ -46,7 +46,7 @@ And can be recorded with: |format|video codecs|audio codecs| |------|------------|------------| -|[fMP4](#record-streams-to-disk)|AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G711, LPCM| +|[fMP4](#record-streams-to-disk)|AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G711 (PCMA, PCMU), LPCM| |[MPEG-TS](#record-streams-to-disk)|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| **Features** diff --git a/go.mod b/go.mod index 6e222f496bd..2f6b6c6fdca 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/alecthomas/kong v0.8.1 github.com/aler9/writerseeker v1.1.0 github.com/bluenviron/gohlslib v1.2.0 - github.com/bluenviron/gortsplib/v4 v4.6.2 + github.com/bluenviron/gortsplib/v4 v4.6.3-0.20240107120136-f9eb8e573bbe github.com/bluenviron/mediacommon v1.6.1-0.20231228213201-7bb211dba7e1 github.com/datarhei/gosrt v0.5.5 github.com/fsnotify/fsnotify v1.7.0 diff --git a/go.sum b/go.sum index e646db640a0..21a2d99f7e2 100644 --- a/go.sum +++ b/go.sum @@ -22,8 +22,8 @@ github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYh github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= github.com/bluenviron/gohlslib v1.2.0 h1:Hrx2/n/AcmKKIV+MjZLKs5kmW+O7xCdUSPJQoS39JKw= github.com/bluenviron/gohlslib v1.2.0/go.mod h1:kG/Sjebsxnf5asMGaGcQ0aSvtFGNChJPgctds2wDHOI= -github.com/bluenviron/gortsplib/v4 v4.6.2 h1:CGIsxpnUFvSlIxnSFS0oFSSfwsHMmBCmYcrGAtIcwXc= -github.com/bluenviron/gortsplib/v4 v4.6.2/go.mod h1:dN1YjyPNMfy/NwC17Ga6MiIMiUoQfg5GL7LGsVHa0Jo= +github.com/bluenviron/gortsplib/v4 v4.6.3-0.20240107120136-f9eb8e573bbe h1:eVx4BU4mF26UK/SQlsnIkeCIraQR8AcJf42ymYR3pQE= +github.com/bluenviron/gortsplib/v4 v4.6.3-0.20240107120136-f9eb8e573bbe/go.mod h1:UqdkRR5YvKHP/wHEQQySJFKJm6tIZcftdP7cNszIZ1g= github.com/bluenviron/mediacommon v1.6.1-0.20231228213201-7bb211dba7e1 h1:f8XDAHvgPbT+n5qf73REdUo9kLmGpP4HNgphKI/8fGE= github.com/bluenviron/mediacommon v1.6.1-0.20231228213201-7bb211dba7e1/go.mod h1:Ij/kE1LEucSjryNBVTyPL/gBI0d6/Css3f5PyrM957w= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= diff --git a/internal/formatprocessor/ac3.go b/internal/formatprocessor/ac3.go index 95bf02c9cae..20badf04e7a 100644 --- a/internal/formatprocessor/ac3.go +++ b/internal/formatprocessor/ac3.go @@ -53,14 +53,13 @@ func (t *formatProcessorAC3) ProcessUnit(uu unit.Unit) error { //nolint:dupl if err != nil { return err } + u.RTPPackets = pkts ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - for _, pkt := range pkts { + for _, pkt := range u.RTPPackets { pkt.Timestamp += ts } - u.RTPPackets = pkts - return nil } diff --git a/internal/formatprocessor/av1.go b/internal/formatprocessor/av1.go index 6d4898b8c8a..2bc8bae66f7 100644 --- a/internal/formatprocessor/av1.go +++ b/internal/formatprocessor/av1.go @@ -55,14 +55,13 @@ func (t *formatProcessorAV1) ProcessUnit(uu unit.Unit) error { //nolint:dupl if err != nil { return err } + u.RTPPackets = pkts ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - for _, pkt := range pkts { + for _, pkt := range u.RTPPackets { pkt.Timestamp += ts } - u.RTPPackets = pkts - return nil } diff --git a/internal/formatprocessor/av1_test.go b/internal/formatprocessor/av1_test.go deleted file mode 100644 index 136e2bf8332..00000000000 --- a/internal/formatprocessor/av1_test.go +++ /dev/null @@ -1 +0,0 @@ -package formatprocessor diff --git a/internal/formatprocessor/g711.go b/internal/formatprocessor/g711.go index ffa39d8c9af..a1b83843821 100644 --- a/internal/formatprocessor/g711.go +++ b/internal/formatprocessor/g711.go @@ -41,6 +41,7 @@ func newG711( func (t *formatProcessorG711) createEncoder() error { t.encoder = &rtpsimpleaudio.Encoder{ PayloadMaxSize: t.udpMaxPayloadSize - 12, + PayloadType: t.format.PayloadType(), } return t.encoder.Init() } @@ -52,11 +53,10 @@ func (t *formatProcessorG711) ProcessUnit(uu unit.Unit) error { //nolint:dupl if err != nil { return err } + u.RTPPackets = []*rtp.Packet{pkt} ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - pkt.Timestamp += ts - - u.RTPPackets = []*rtp.Packet{pkt} + u.RTPPackets[0].Timestamp += ts return nil } diff --git a/internal/formatprocessor/g711_test.go b/internal/formatprocessor/g711_test.go new file mode 100644 index 00000000000..85211403931 --- /dev/null +++ b/internal/formatprocessor/g711_test.go @@ -0,0 +1,62 @@ +package formatprocessor + +import ( + "testing" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediamtx/internal/unit" + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +func TestG611Encode(t *testing.T) { + t.Run("alaw", func(t *testing.T) { + forma := &format.G711{ + MULaw: false, + } + + p, err := New(1472, forma, true) + require.NoError(t, err) + + unit := &unit.G711{ + Samples: []byte{1, 2, 3, 4}, + } + + err = p.ProcessUnit(unit) + require.NoError(t, err) + require.Equal(t, []*rtp.Packet{{ + Header: rtp.Header{ + Version: 2, + PayloadType: 8, + SequenceNumber: unit.RTPPackets[0].SequenceNumber, + SSRC: unit.RTPPackets[0].SSRC, + }, + Payload: []byte{1, 2, 3, 4}, + }}, unit.RTPPackets) + }) + + t.Run("mulaw", func(t *testing.T) { + forma := &format.G711{ + MULaw: true, + } + + p, err := New(1472, forma, true) + require.NoError(t, err) + + unit := &unit.G711{ + Samples: []byte{1, 2, 3, 4}, + } + + err = p.ProcessUnit(unit) + require.NoError(t, err) + require.Equal(t, []*rtp.Packet{{ + Header: rtp.Header{ + Version: 2, + PayloadType: 0, + SequenceNumber: unit.RTPPackets[0].SequenceNumber, + SSRC: unit.RTPPackets[0].SSRC, + }, + Payload: []byte{1, 2, 3, 4}, + }}, unit.RTPPackets) + }) +} diff --git a/internal/formatprocessor/h264.go b/internal/formatprocessor/h264.go index 19c3691a043..e34aef530d6 100644 --- a/internal/formatprocessor/h264.go +++ b/internal/formatprocessor/h264.go @@ -224,13 +224,12 @@ func (t *formatProcessorH264) ProcessUnit(uu unit.Unit) error { if err != nil { return err } + u.RTPPackets = pkts ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - for _, pkt := range pkts { + for _, pkt := range u.RTPPackets { pkt.Timestamp += ts } - - u.RTPPackets = pkts } return nil @@ -306,12 +305,11 @@ func (t *formatProcessorH264) ProcessRTPPacket( //nolint:dupl if err != nil { return nil, err } + u.RTPPackets = pkts - for _, newPKT := range pkts { + for _, newPKT := range u.RTPPackets { newPKT.Timestamp = pkt.Timestamp } - - u.RTPPackets = pkts } return u, nil diff --git a/internal/formatprocessor/h265.go b/internal/formatprocessor/h265.go index 486b6943d76..74b8b80b5f5 100644 --- a/internal/formatprocessor/h265.go +++ b/internal/formatprocessor/h265.go @@ -243,13 +243,12 @@ func (t *formatProcessorH265) ProcessUnit(uu unit.Unit) error { //nolint:dupl if err != nil { return err } + u.RTPPackets = pkts ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - for _, pkt := range pkts { + for _, pkt := range u.RTPPackets { pkt.Timestamp += ts } - - u.RTPPackets = pkts } return nil @@ -325,12 +324,11 @@ func (t *formatProcessorH265) ProcessRTPPacket( //nolint:dupl if err != nil { return nil, err } + u.RTPPackets = pkts - for _, newPKT := range pkts { + for _, newPKT := range u.RTPPackets { newPKT.Timestamp = pkt.Timestamp } - - u.RTPPackets = pkts } return u, nil diff --git a/internal/formatprocessor/lpcm.go b/internal/formatprocessor/lpcm.go index b43f2e1f42a..c26d03bd716 100644 --- a/internal/formatprocessor/lpcm.go +++ b/internal/formatprocessor/lpcm.go @@ -41,6 +41,9 @@ func newLPCM( func (t *formatProcessorLPCM) createEncoder() error { t.encoder = &rtplpcm.Encoder{ PayloadMaxSize: t.udpMaxPayloadSize - 12, + PayloadType: t.format.PayloadTyp, + BitDepth: t.format.BitDepth, + ChannelCount: t.format.ChannelCount, } return t.encoder.Init() } @@ -52,14 +55,13 @@ func (t *formatProcessorLPCM) ProcessUnit(uu unit.Unit) error { //nolint:dupl if err != nil { return err } + u.RTPPackets = pkts ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - for _, pkt := range pkts { + for _, pkt := range u.RTPPackets { pkt.Timestamp += ts } - u.RTPPackets = pkts - return nil } diff --git a/internal/formatprocessor/lpcm_test.go b/internal/formatprocessor/lpcm_test.go new file mode 100644 index 00000000000..a217a38f35d --- /dev/null +++ b/internal/formatprocessor/lpcm_test.go @@ -0,0 +1,37 @@ +package formatprocessor + +import ( + "testing" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediamtx/internal/unit" + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +func TestLPCMEncode(t *testing.T) { + forma := &format.LPCM{ + PayloadTyp: 96, + BitDepth: 16, + ChannelCount: 2, + } + + p, err := New(1472, forma, true) + require.NoError(t, err) + + unit := &unit.LPCM{ + Samples: []byte{1, 2, 3, 4}, + } + + err = p.ProcessUnit(unit) + require.NoError(t, err) + require.Equal(t, []*rtp.Packet{{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + SequenceNumber: unit.RTPPackets[0].SequenceNumber, + SSRC: unit.RTPPackets[0].SSRC, + }, + Payload: []byte{1, 2, 3, 4}, + }}, unit.RTPPackets) +} diff --git a/internal/formatprocessor/mjpeg.go b/internal/formatprocessor/mjpeg.go index 573019194bb..90c5cce25d5 100644 --- a/internal/formatprocessor/mjpeg.go +++ b/internal/formatprocessor/mjpeg.go @@ -54,14 +54,13 @@ func (t *formatProcessorMJPEG) ProcessUnit(uu unit.Unit) error { //nolint:dupl if err != nil { return err } + u.RTPPackets = pkts ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - for _, pkt := range pkts { + for _, pkt := range u.RTPPackets { pkt.Timestamp += ts } - u.RTPPackets = pkts - return nil } diff --git a/internal/formatprocessor/mpeg1_audio.go b/internal/formatprocessor/mpeg1_audio.go index 0ee764aa915..6a02f62bd8d 100644 --- a/internal/formatprocessor/mpeg1_audio.go +++ b/internal/formatprocessor/mpeg1_audio.go @@ -53,14 +53,13 @@ func (t *formatProcessorMPEG1Audio) ProcessUnit(uu unit.Unit) error { //nolint:d if err != nil { return err } + u.RTPPackets = pkts ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - for _, pkt := range pkts { + for _, pkt := range u.RTPPackets { pkt.Timestamp += ts } - u.RTPPackets = pkts - return nil } diff --git a/internal/formatprocessor/mpeg1_video.go b/internal/formatprocessor/mpeg1_video.go index 47be0e30d33..2ce207f8f9c 100644 --- a/internal/formatprocessor/mpeg1_video.go +++ b/internal/formatprocessor/mpeg1_video.go @@ -54,14 +54,13 @@ func (t *formatProcessorMPEG1Video) ProcessUnit(uu unit.Unit) error { //nolint:d if err != nil { return err } + u.RTPPackets = pkts ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - for _, pkt := range pkts { + for _, pkt := range u.RTPPackets { pkt.Timestamp += ts } - u.RTPPackets = pkts - return nil } diff --git a/internal/formatprocessor/mpeg4_audio.go b/internal/formatprocessor/mpeg4_audio.go index bcdac2d4a51..6ca41deaf36 100644 --- a/internal/formatprocessor/mpeg4_audio.go +++ b/internal/formatprocessor/mpeg4_audio.go @@ -57,14 +57,13 @@ func (t *formatProcessorMPEG4Audio) ProcessUnit(uu unit.Unit) error { //nolint:d if err != nil { return err } + u.RTPPackets = pkts ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - for _, pkt := range pkts { + for _, pkt := range u.RTPPackets { pkt.Timestamp += ts } - u.RTPPackets = pkts - return nil } diff --git a/internal/formatprocessor/mpeg4_video.go b/internal/formatprocessor/mpeg4_video.go index 88872d0e458..56786761acc 100644 --- a/internal/formatprocessor/mpeg4_video.go +++ b/internal/formatprocessor/mpeg4_video.go @@ -94,7 +94,7 @@ func (t *formatProcessorMPEG4Video) ProcessUnit(uu unit.Unit) error { //nolint:d } ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - for _, pkt := range pkts { + for _, pkt := range u.RTPPackets { pkt.Timestamp += ts } diff --git a/internal/formatprocessor/vp8.go b/internal/formatprocessor/vp8.go index 990af8cbd2b..3b18397fb0f 100644 --- a/internal/formatprocessor/vp8.go +++ b/internal/formatprocessor/vp8.go @@ -54,14 +54,13 @@ func (t *formatProcessorVP8) ProcessUnit(uu unit.Unit) error { //nolint:dupl if err != nil { return err } + u.RTPPackets = pkts ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - for _, pkt := range pkts { + for _, pkt := range u.RTPPackets { pkt.Timestamp += ts } - u.RTPPackets = pkts - return nil } diff --git a/internal/formatprocessor/vp9.go b/internal/formatprocessor/vp9.go index 91688cb5f4c..a6af1cd6ff4 100644 --- a/internal/formatprocessor/vp9.go +++ b/internal/formatprocessor/vp9.go @@ -54,14 +54,13 @@ func (t *formatProcessorVP9) ProcessUnit(uu unit.Unit) error { //nolint:dupl if err != nil { return err } + u.RTPPackets = pkts ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) - for _, pkt := range pkts { + for _, pkt := range u.RTPPackets { pkt.Timestamp += ts } - u.RTPPackets = pkts - return nil } diff --git a/internal/protocols/rtmp/chunk/chunk_test.go b/internal/protocols/rtmp/chunk/chunk_test.go index b7507e3d530..9748aaae1d2 100644 --- a/internal/protocols/rtmp/chunk/chunk_test.go +++ b/internal/protocols/rtmp/chunk/chunk_test.go @@ -156,3 +156,31 @@ func TestChunkMarshal(t *testing.T) { }) } } + +func FuzzChunk0Read(f *testing.F) { + f.Fuzz(func(t *testing.T, b []byte) { + var chunk Chunk0 + chunk.Read(bytes.NewReader(b), 65536, false) //nolint:errcheck + }) +} + +func FuzzChunk1Read(f *testing.F) { + f.Fuzz(func(t *testing.T, b []byte) { + var chunk Chunk1 + chunk.Read(bytes.NewReader(b), 65536, false) //nolint:errcheck + }) +} + +func FuzzChunk2Read(f *testing.F) { + f.Fuzz(func(t *testing.T, b []byte) { + var chunk Chunk2 + chunk.Read(bytes.NewReader(b), 65536, false) //nolint:errcheck + }) +} + +func FuzzChunk3Read(f *testing.F) { + f.Fuzz(func(t *testing.T, b []byte) { + var chunk Chunk3 + chunk.Read(bytes.NewReader(b), 65536, true) //nolint:errcheck + }) +} diff --git a/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk0Read/582528ddfad69eb5 b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk0Read/582528ddfad69eb5 new file mode 100644 index 00000000000..a96f5599e6b --- /dev/null +++ b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk0Read/582528ddfad69eb5 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("0") diff --git a/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk0Read/5f73a77c7f93e5f8 b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk0Read/5f73a77c7f93e5f8 new file mode 100644 index 00000000000..c9756ec7e5a --- /dev/null +++ b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk0Read/5f73a77c7f93e5f8 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("0\xff\xff\xff00000000") diff --git a/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk1Read/553384c8664fe971 b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk1Read/553384c8664fe971 new file mode 100644 index 00000000000..26f98f8852d --- /dev/null +++ b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk1Read/553384c8664fe971 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("0\xff\xff\xff0000") diff --git a/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk1Read/582528ddfad69eb5 b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk1Read/582528ddfad69eb5 new file mode 100644 index 00000000000..a96f5599e6b --- /dev/null +++ b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk1Read/582528ddfad69eb5 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("0") diff --git a/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk2Read/582528ddfad69eb5 b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk2Read/582528ddfad69eb5 new file mode 100644 index 00000000000..a96f5599e6b --- /dev/null +++ b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk2Read/582528ddfad69eb5 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("0") diff --git a/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk2Read/feb2b2a8b4ba63ba b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk2Read/feb2b2a8b4ba63ba new file mode 100644 index 00000000000..01533d52714 --- /dev/null +++ b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk2Read/feb2b2a8b4ba63ba @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("0\xff\xff\xff") diff --git a/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk3Read/582528ddfad69eb5 b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk3Read/582528ddfad69eb5 new file mode 100644 index 00000000000..a96f5599e6b --- /dev/null +++ b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk3Read/582528ddfad69eb5 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("0") diff --git a/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk3Read/caf81e9797b19c76 b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk3Read/caf81e9797b19c76 new file mode 100644 index 00000000000..67322c70489 --- /dev/null +++ b/internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk3Read/caf81e9797b19c76 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("") diff --git a/internal/protocols/rtmp/message/audio.go b/internal/protocols/rtmp/message/audio.go index 074f8e0552c..06568983595 100644 --- a/internal/protocols/rtmp/message/audio.go +++ b/internal/protocols/rtmp/message/audio.go @@ -12,12 +12,35 @@ const ( AudioChunkStreamID = 4 ) -// supported audio codecs +// audio codecs const ( CodecMPEG1Audio = 2 + CodecLPCM = 3 + CodecPCMA = 7 + CodecPCMU = 8 CodecMPEG4Audio = 10 ) +// audio rates +const ( + Rate5512 = 0 + Rate11025 = 1 + Rate22050 = 2 + Rate44100 = 3 +) + +// audio depths +const ( + Depth8 = 0 + Depth16 = 1 +) + +// audio channels +const ( + ChannelsMono = 0 + ChannelsStereo = 1 +) + // AudioAACType is the AAC type of a Audio. type AudioAACType uint8 @@ -52,7 +75,7 @@ func (m *Audio) Unmarshal(raw *rawmessage.Message) error { m.Codec = raw.Body[0] >> 4 switch m.Codec { - case CodecMPEG1Audio, CodecMPEG4Audio: + case CodecMPEG4Audio, CodecMPEG1Audio, CodecPCMA, CodecPCMU, CodecLPCM: default: return fmt.Errorf("unsupported audio codec: %d", m.Codec) } @@ -61,9 +84,7 @@ func (m *Audio) Unmarshal(raw *rawmessage.Message) error { m.Depth = (raw.Body[0] >> 1) & 0x01 m.Channels = raw.Body[0] & 0x01 - if m.Codec == CodecMPEG1Audio { - m.Payload = raw.Body[1:] - } else { + if m.Codec == CodecMPEG4Audio { m.AACType = AudioAACType(raw.Body[1]) switch m.AACType { case AudioAACTypeConfig, AudioAACTypeAU: @@ -72,6 +93,8 @@ func (m *Audio) Unmarshal(raw *rawmessage.Message) error { } m.Payload = raw.Body[2:] + } else { + m.Payload = raw.Body[1:] } return nil @@ -93,11 +116,11 @@ func (m Audio) Marshal() (*rawmessage.Message, error) { body[0] = m.Codec<<4 | m.Rate<<2 | m.Depth<<1 | m.Channels - if m.Codec == CodecMPEG1Audio { - copy(body[1:], m.Payload) - } else { + if m.Codec == CodecMPEG4Audio { body[1] = uint8(m.AACType) copy(body[2:], m.Payload) + } else { + copy(body[1:], m.Payload) } return &rawmessage.Message{ diff --git a/internal/protocols/rtmp/message/reader_test.go b/internal/protocols/rtmp/message/reader_test.go index 1add82a8ca9..a6d16fab9d3 100644 --- a/internal/protocols/rtmp/message/reader_test.go +++ b/internal/protocols/rtmp/message/reader_test.go @@ -33,9 +33,9 @@ var readWriterCases = []struct { DTS: 6013806 * time.Millisecond, MessageStreamID: 4534543, Codec: CodecMPEG1Audio, - Rate: flvio.SOUND_44Khz, - Depth: flvio.SOUND_16BIT, - Channels: flvio.SOUND_STEREO, + Rate: Rate44100, + Depth: Depth16, + Channels: ChannelsStereo, Payload: []byte{0x01, 0x02, 0x03, 0x04}, }, []byte{ @@ -50,9 +50,9 @@ var readWriterCases = []struct { DTS: 6013806 * time.Millisecond, MessageStreamID: 4534543, Codec: CodecMPEG4Audio, - Rate: flvio.SOUND_44Khz, - Depth: flvio.SOUND_16BIT, - Channels: flvio.SOUND_STEREO, + Rate: Rate44100, + Depth: Depth16, + Channels: ChannelsStereo, AACType: AudioAACTypeAU, Payload: []byte{0x5A, 0xC0, 0x77, 0x40}, }, diff --git a/internal/protocols/rtmp/message/video.go b/internal/protocols/rtmp/message/video.go index 239bbff7422..287b1099142 100644 --- a/internal/protocols/rtmp/message/video.go +++ b/internal/protocols/rtmp/message/video.go @@ -4,8 +4,6 @@ import ( "fmt" "time" - "github.com/notedit/rtmp/format/flv/flvio" - "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" ) @@ -51,7 +49,7 @@ func (m *Video) Unmarshal(raw *rawmessage.Message) error { return fmt.Errorf("invalid body size") } - m.IsKeyFrame = (raw.Body[0] >> 4) == flvio.FRAME_KEY + m.IsKeyFrame = (raw.Body[0] >> 4) == 1 m.Codec = raw.Body[0] & 0x0F switch m.Codec { @@ -83,9 +81,9 @@ func (m Video) Marshal() (*rawmessage.Message, error) { body := make([]byte, m.marshalBodySize()) if m.IsKeyFrame { - body[0] = flvio.FRAME_KEY << 4 + body[0] = 1 << 4 } else { - body[0] = flvio.FRAME_INTER << 4 + body[0] = 2 << 4 } body[0] |= m.Codec body[1] = uint8(m.Type) diff --git a/internal/protocols/rtmp/reader.go b/internal/protocols/rtmp/reader.go index daee97dbd02..61f9d6af77e 100644 --- a/internal/protocols/rtmp/reader.go +++ b/internal/protocols/rtmp/reader.go @@ -37,6 +37,12 @@ type OnDataMPEG4AudioFunc func(pts time.Duration, au []byte) // OnDataMPEG1AudioFunc is the prototype of the callback passed to OnDataMPEG1Audio(). type OnDataMPEG1AudioFunc func(pts time.Duration, frame []byte) +// OnDataG711Func is the prototype of the callback passed to OnDataG711(). +type OnDataG711Func func(pts time.Duration, samples []byte) + +// OnDataLPCMFunc is the prototype of the callback passed to OnDataLPCM(). +type OnDataLPCMFunc func(pts time.Duration, samples []byte) + func hasVideo(md flvio.AMFMap) (bool, error) { v, ok := md.GetV("videocodecid") if !ok { @@ -81,11 +87,25 @@ func hasAudio(md flvio.AMFMap, audioTrack *format.Format) (bool, error) { case 0: return false, nil + case message.CodecMPEG4Audio, message.CodecLPCM: + return true, nil + case message.CodecMPEG1Audio: *audioTrack = &format.MPEG1Audio{} return true, nil - case message.CodecMPEG4Audio: + case message.CodecPCMA: + v, ok := md.GetV("stereo") + if ok && v == true { + return false, fmt.Errorf("stereo PCMA is not supported") + } + return true, nil + + case message.CodecPCMU: + v, ok := md.GetV("stereo") + if ok && v == true { + return false, fmt.Errorf("stereo PCMU is not supported") + } return true, nil } @@ -95,7 +115,7 @@ func hasAudio(md flvio.AMFMap, audioTrack *format.Format) (bool, error) { } } - return false, fmt.Errorf("unsupported audio codec %v", v) + return false, fmt.Errorf("unsupported audio codec: %v", v) } func h265FindNALU(array []mp4.HEVCNaluArray, typ h265.NALUType) []byte { @@ -244,6 +264,10 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma } case *message.ExtendedSequenceStart: + if !hasVideo { + return nil, nil, fmt.Errorf("unexpected video packet") + } + if videoTrack == nil { switch msg.FourCC { case message.FourCCHEVC: @@ -302,12 +326,46 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma return nil, nil, fmt.Errorf("unexpected audio packet") } - if audioTrack == nil && - msg.Codec == message.CodecMPEG4Audio && - msg.AACType == message.AudioAACTypeConfig { - audioTrack, err = trackFromAACDecoderConfig(msg.Payload) - if err != nil { - return nil, nil, err + if audioTrack == nil { + switch { + case msg.Codec == message.CodecMPEG4Audio && + msg.AACType == message.AudioAACTypeConfig: + audioTrack, err = trackFromAACDecoderConfig(msg.Payload) + if err != nil { + return nil, nil, err + } + + case msg.Codec == message.CodecPCMA: + if msg.Channels == message.ChannelsStereo { + return nil, nil, fmt.Errorf("stereo PCMA is not supported") + } + + audioTrack = &format.G711{MULaw: false} + + case msg.Codec == message.CodecPCMU: + if msg.Channels == message.ChannelsStereo { + return nil, nil, fmt.Errorf("stereo PCMU is not supported") + } + + audioTrack = &format.G711{MULaw: true} + + case msg.Codec == message.CodecLPCM: + audioTrack = &format.LPCM{ + PayloadTyp: 96, + BitDepth: func() int { + if msg.Depth == message.Depth16 { + return 16 + } + return 8 + }(), + SampleRate: audioRateRTMPToInt(msg.Rate), + ChannelCount: func() int { + if msg.Channels == message.ChannelsMono { + return 1 + } + return 2 + }(), + } } } } @@ -580,6 +638,41 @@ func (r *Reader) OnDataMPEG1Audio(cb OnDataMPEG1AudioFunc) { } } +// OnDataG711 sets a callback that is called when G711 data is received. +func (r *Reader) OnDataG711(cb OnDataG711Func) { + r.onDataAudio = func(msg *message.Audio) error { + cb(msg.DTS, msg.Payload) + return nil + } +} + +// OnDataLPCM sets a callback that is called when LPCM data is received. +func (r *Reader) OnDataLPCM(cb OnDataLPCMFunc) { + bitDepth := r.audioTrack.(*format.LPCM).BitDepth + + if bitDepth == 16 { + r.onDataAudio = func(msg *message.Audio) error { + le := len(msg.Payload) + if le%2 != 0 { + return fmt.Errorf("invalid payload length: %d", le) + } + + // convert from little endian to big endian + for i := 0; i < le; i += 2 { + msg.Payload[i], msg.Payload[i+1] = msg.Payload[i+1], msg.Payload[i] + } + + cb(msg.DTS, msg.Payload) + return nil + } + } else { + r.onDataAudio = func(msg *message.Audio) error { + cb(msg.DTS, msg.Payload) + return nil + } + } +} + // Read reads data. func (r *Reader) Read() error { msg, err := r.conn.Read() diff --git a/internal/protocols/rtmp/reader_test.go b/internal/protocols/rtmp/reader_test.go index 312327f20b5..40776ceb0f7 100644 --- a/internal/protocols/rtmp/reader_test.go +++ b/internal/protocols/rtmp/reader_test.go @@ -109,7 +109,7 @@ func TestReadTracks(t *testing.T) { messages []message.Message }{ { - "video+audio", + "h264 + aac", &format.H264{ PayloadTyp: 96, SPS: h264SPS, @@ -172,9 +172,9 @@ func TestReadTracks(t *testing.T) { ChunkStreamID: message.AudioChunkStreamID, MessageStreamID: 0x1000000, Codec: message.CodecMPEG4Audio, - Rate: flvio.SOUND_44Khz, - Depth: flvio.SOUND_16BIT, - Channels: flvio.SOUND_STEREO, + Rate: message.Rate44100, + Depth: message.Depth16, + Channels: message.ChannelsStereo, AACType: message.AudioAACTypeConfig, Payload: func() []byte { enc, err := mpeg4audio.Config{ @@ -189,7 +189,7 @@ func TestReadTracks(t *testing.T) { }, }, { - "video", + "h264", &format.H264{ PayloadTyp: 96, SPS: h264SPS, @@ -241,7 +241,7 @@ func TestReadTracks(t *testing.T) { }, }, { - "issue mediamtx/386 (missing metadata), video+audio", + "h264 + aac, issue mediamtx/386 (missing metadata)", &format.H264{ PayloadTyp: 96, SPS: h264SPS, @@ -292,9 +292,9 @@ func TestReadTracks(t *testing.T) { ChunkStreamID: message.AudioChunkStreamID, MessageStreamID: 0x1000000, Codec: message.CodecMPEG4Audio, - Rate: flvio.SOUND_44Khz, - Depth: flvio.SOUND_16BIT, - Channels: flvio.SOUND_STEREO, + Rate: message.Rate44100, + Depth: message.Depth16, + Channels: message.ChannelsStereo, AACType: message.AudioAACTypeConfig, Payload: func() []byte { enc, err := mpeg4audio.Config{ @@ -309,7 +309,7 @@ func TestReadTracks(t *testing.T) { }, }, { - "issue mediamtx/386 (missing metadata), audio", + "aac, issue mediamtx/386 (missing metadata)", nil, &format.MPEG4Audio{ PayloadTyp: 96, @@ -327,9 +327,9 @@ func TestReadTracks(t *testing.T) { ChunkStreamID: message.AudioChunkStreamID, MessageStreamID: 0x1000000, Codec: message.CodecMPEG4Audio, - Rate: flvio.SOUND_44Khz, - Depth: flvio.SOUND_16BIT, - Channels: flvio.SOUND_STEREO, + Rate: message.Rate44100, + Depth: message.Depth16, + Channels: message.ChannelsStereo, AACType: message.AudioAACTypeConfig, Payload: func() []byte { enc, err := mpeg4audio.Config{ @@ -345,9 +345,9 @@ func TestReadTracks(t *testing.T) { ChunkStreamID: message.AudioChunkStreamID, MessageStreamID: 0x1000000, Codec: message.CodecMPEG4Audio, - Rate: flvio.SOUND_44Khz, - Depth: flvio.SOUND_16BIT, - Channels: flvio.SOUND_STEREO, + Rate: message.Rate44100, + Depth: message.Depth16, + Channels: message.ChannelsStereo, AACType: message.AudioAACTypeConfig, Payload: func() []byte { enc, err := mpeg4audio.Config{ @@ -363,7 +363,7 @@ func TestReadTracks(t *testing.T) { }, }, { - "obs studio pre 29.1 h265", + "h265 + aac, obs studio pre 29.1 h265", &format.H265{ PayloadTyp: 96, VPS: h265VPS, @@ -428,9 +428,9 @@ func TestReadTracks(t *testing.T) { ChunkStreamID: message.AudioChunkStreamID, MessageStreamID: 0x1000000, Codec: message.CodecMPEG4Audio, - Rate: flvio.SOUND_44Khz, - Depth: flvio.SOUND_16BIT, - Channels: flvio.SOUND_STEREO, + Rate: message.Rate44100, + Depth: message.Depth16, + Channels: message.ChannelsStereo, AACType: message.AudioAACTypeConfig, Payload: func() []byte { enc, err := mpeg4audio.Config{ @@ -445,7 +445,7 @@ func TestReadTracks(t *testing.T) { }, }, { - "issue mediamtx/2232 (xsplit broadcaster)", + "h265, issue mediamtx/2232 (xsplit broadcaster)", &format.H265{ PayloadTyp: 96, VPS: h265VPS, @@ -494,7 +494,7 @@ func TestReadTracks(t *testing.T) { }, }, { - "obs 30", + "h265, obs 30.0", &format.H265{ PayloadTyp: 96, VPS: h265VPS, @@ -543,7 +543,7 @@ func TestReadTracks(t *testing.T) { }, }, { - "ffmpeg av1", + "av1, ffmpeg", &format.AV1{ PayloadTyp: 96, }, @@ -604,7 +604,7 @@ func TestReadTracks(t *testing.T) { }, }, { - "issue mediamtx/2289 (missing videocodecid)", + "h264 + aac, issue mediamtx/2289 (missing videocodecid)", &format.H264{ PayloadTyp: 96, SPS: []byte{ @@ -680,7 +680,7 @@ func TestReadTracks(t *testing.T) { }, }, { - "issue mediamtx/2352 (missing audio)", + "h264, issue mediamtx/2352", &format.H264{ PayloadTyp: 96, SPS: h264SPS, @@ -762,6 +762,123 @@ func TestReadTracks(t *testing.T) { }, }, }, + { + "mpeg-1 audio", + nil, + &format.MPEG1Audio{}, + []message.Message{ + &message.DataAMF0{ + ChunkStreamID: 4, + MessageStreamID: 1, + Payload: []interface{}{ + "@setDataFrame", + "onMetaData", + flvio.AMFMap{ + {K: "duration", V: 0}, + {K: "audiocodecid", V: 2}, + {K: "encoder", V: "Lavf58.45.100"}, + {K: "filesize", V: 0}, + }, + }, + }, + }, + }, + { + "pcma", + nil, + &format.G711{}, + []message.Message{ + &message.DataAMF0{ + ChunkStreamID: 4, + MessageStreamID: 1, + Payload: []interface{}{ + "@setDataFrame", + "onMetaData", + flvio.AMFMap{ + {K: "duration", V: 0}, + {K: "audiocodecid", V: 7}, + {K: "encoder", V: "Lavf58.45.100"}, + {K: "filesize", V: 0}, + }, + }, + }, + &message.Audio{ + ChunkStreamID: message.AudioChunkStreamID, + MessageStreamID: 0x1000000, + Codec: message.CodecPCMA, + Rate: message.Rate5512, + Depth: message.Depth16, + Channels: message.ChannelsMono, + Payload: []byte{1, 2, 3, 4}, + }, + }, + }, + { + "pcmu", + nil, + &format.G711{ + MULaw: true, + }, + []message.Message{ + &message.DataAMF0{ + ChunkStreamID: 4, + MessageStreamID: 1, + Payload: []interface{}{ + "@setDataFrame", + "onMetaData", + flvio.AMFMap{ + {K: "duration", V: 0}, + {K: "audiocodecid", V: 8}, + {K: "encoder", V: "Lavf58.45.100"}, + {K: "filesize", V: 0}, + }, + }, + }, + &message.Audio{ + ChunkStreamID: message.AudioChunkStreamID, + MessageStreamID: 0x1000000, + Codec: message.CodecPCMU, + Rate: message.Rate5512, + Depth: message.Depth16, + Channels: message.ChannelsMono, + Payload: []byte{1, 2, 3, 4}, + }, + }, + }, + { + "lpcm gstreamer", + nil, + &format.LPCM{ + PayloadTyp: 96, + BitDepth: 16, + SampleRate: 44100, + ChannelCount: 2, + }, + []message.Message{ + &message.DataAMF0{ + ChunkStreamID: 4, + MessageStreamID: 1, + Payload: []interface{}{ + "@setDataFrame", + "onMetaData", + flvio.AMFMap{ + {K: "duration", V: 0}, + {K: "audiocodecid", V: 3}, + {K: "filesize", V: 0}, + }, + }, + }, + &message.Audio{ + ChunkStreamID: message.AudioChunkStreamID, + MessageStreamID: 0x1000000, + Codec: message.CodecLPCM, + Rate: message.Rate44100, + Depth: message.Depth16, + Channels: message.ChannelsStereo, + Payload: []byte{1, 2, 3, 4}, + }, + }, + }, } { t.Run(ca.name, func(t *testing.T) { var buf bytes.Buffer diff --git a/internal/protocols/rtmp/writer.go b/internal/protocols/rtmp/writer.go index fbb69909190..10df62c9029 100644 --- a/internal/protocols/rtmp/writer.go +++ b/internal/protocols/rtmp/writer.go @@ -13,24 +13,37 @@ import ( "github.com/bluenviron/mediamtx/internal/protocols/rtmp/message" ) -func mpeg1AudioRate(sr int) uint8 { - switch sr { - case 5500: - return flvio.SOUND_5_5Khz +func audioRateRTMPToInt(v uint8) int { + switch v { + case message.Rate5512: + return 5512 + case message.Rate11025: + return 11025 + case message.Rate22050: + return 22050 + default: + return 44100 + } +} + +func audioRateIntToRTMP(v int) uint8 { + switch v { + case 5512: + return message.Rate5512 case 11025: - return flvio.SOUND_11Khz + return message.Rate11025 case 22050: - return flvio.SOUND_22Khz + return message.Rate22050 default: - return flvio.SOUND_44Khz + return message.Rate44100 } } func mpeg1AudioChannels(m mpeg1audio.ChannelMode) uint8 { if m == mpeg1audio.ChannelModeMono { - return flvio.SOUND_MONO + return message.ChannelsMono } - return flvio.SOUND_STEREO + return message.ChannelsStereo } // Writer is a wrapper around Conn that provides utilities to mux outgoing data. @@ -141,9 +154,9 @@ func (w *Writer) writeTracks(videoTrack format.Format, audioTrack format.Format) ChunkStreamID: message.AudioChunkStreamID, MessageStreamID: 0x1000000, Codec: message.CodecMPEG4Audio, - Rate: flvio.SOUND_44Khz, - Depth: flvio.SOUND_16BIT, - Channels: flvio.SOUND_STEREO, + Rate: message.Rate44100, + Depth: message.Depth16, + Channels: message.ChannelsStereo, AACType: message.AudioAACTypeConfig, Payload: enc, }) @@ -180,9 +193,9 @@ func (w *Writer) WriteMPEG4Audio(pts time.Duration, au []byte) error { ChunkStreamID: message.AudioChunkStreamID, MessageStreamID: 0x1000000, Codec: message.CodecMPEG4Audio, - Rate: flvio.SOUND_44Khz, - Depth: flvio.SOUND_16BIT, - Channels: flvio.SOUND_STEREO, + Rate: message.Rate44100, + Depth: message.Depth16, + Channels: message.ChannelsStereo, AACType: message.AudioAACTypeAU, Payload: au, DTS: pts, @@ -195,8 +208,8 @@ func (w *Writer) WriteMPEG1Audio(pts time.Duration, h *mpeg1audio.FrameHeader, f ChunkStreamID: message.AudioChunkStreamID, MessageStreamID: 0x1000000, Codec: message.CodecMPEG1Audio, - Rate: mpeg1AudioRate(h.SampleRate), - Depth: flvio.SOUND_16BIT, + Rate: audioRateIntToRTMP(h.SampleRate), + Depth: message.Depth16, Channels: mpeg1AudioChannels(h.ChannelMode), Payload: frame, DTS: pts, diff --git a/internal/protocols/rtmp/writer_test.go b/internal/protocols/rtmp/writer_test.go index 9f3e532c110..dfb03f29b3f 100644 --- a/internal/protocols/rtmp/writer_test.go +++ b/internal/protocols/rtmp/writer_test.go @@ -89,9 +89,9 @@ func TestWriteTracks(t *testing.T) { ChunkStreamID: message.AudioChunkStreamID, MessageStreamID: 0x1000000, Codec: message.CodecMPEG4Audio, - Rate: flvio.SOUND_44Khz, - Depth: flvio.SOUND_16BIT, - Channels: flvio.SOUND_STEREO, + Rate: message.Rate44100, + Depth: message.Depth16, + Channels: message.ChannelsStereo, AACType: message.AudioAACTypeConfig, Payload: []byte{0x12, 0x10}, }, msg) diff --git a/internal/servers/rtmp/conn.go b/internal/servers/rtmp/conn.go index e0813b52651..ca8aafcffaa 100644 --- a/internal/servers/rtmp/conn.go +++ b/internal/servers/rtmp/conn.go @@ -524,6 +524,28 @@ func (c *conn) runPublish(conn *rtmp.Conn, u *url.URL) error { }) }) + case *format.G711: + r.OnDataG711(func(pts time.Duration, samples []byte) { + stream.WriteUnit(audioMedia, audioFormat, &unit.G711{ + Base: unit.Base{ + NTP: time.Now(), + PTS: pts, + }, + Samples: samples, + }) + }) + + case *format.LPCM: + r.OnDataLPCM(func(pts time.Duration, samples []byte) { + stream.WriteUnit(audioMedia, audioFormat, &unit.LPCM{ + Base: unit.Base{ + NTP: time.Now(), + PTS: pts, + }, + Samples: samples, + }) + }) + default: return fmt.Errorf("unsupported audio codec: %T", audioFormat) }