From 8d4f1253d8dd0a6998e9bb2c2e1e9f49a9400d4b Mon Sep 17 00:00:00 2001 From: Earther Date: Mon, 19 Jul 2021 13:36:49 +0700 Subject: [PATCH] stream audio using ffmpeg + rtp --- tstream/consumer.html | 50 +++++--- tstream/pkg/room/sfu.go | 26 +++-- tstream/pkg/streamer/chat.go | 188 +++++++++++++++++++------------ tstream/pkg/streamer/streamer.go | 2 - 4 files changed, 164 insertions(+), 102 deletions(-) diff --git a/tstream/consumer.html b/tstream/consumer.html index f230b84..dd0cdd3 100644 --- a/tstream/consumer.html +++ b/tstream/consumer.html @@ -9,6 +9,7 @@

Local Video

Remote Video


+

Logs

@@ -47,26 +48,43 @@

Logs

//.then(stream => { let pc = new RTCPeerConnection() pc.ontrack = function (event) { - //if (event.track.kind !== 'audio') { - // return - // } - - let el = document.createElement(event.track.kind) - console.log(event.streams) - el.srcObject = event.streams[0] - el.autoplay = true - el.controls = true - document.getElementById('remoteVideos').appendChild(el) - - event.track.onmute = function(event) { - el.play() + if (event.track.kind == 'audio') { + let el = document.createElement(event.track.kind) + console.log(event.streams) + el.srcObject = event.streams[0] + el.autoplay = true + el.controls = true + document.getElementById('remoteAudios').appendChild(el) + + event.track.onmute = function(event) { + el.play() + } + + event.streams[0].onremovetrack = ({track}) => { + if (el.parentNode) { + el.parentNode.removeChild(el) + } + } } + else { + let el = document.createElement(event.track.kind) + console.log(event.streams) + el.srcObject = event.streams[0] + el.autoplay = true + el.controls = true + document.getElementById('remoteVideos').appendChild(el) + + event.track.onmute = function(event) { + el.play() + } - event.streams[0].onremovetrack = ({track}) => { - if (el.parentNode) { - el.parentNode.removeChild(el) + event.streams[0].onremovetrack = ({track}) => { + if (el.parentNode) { + el.parentNode.removeChild(el) + } } } + } //document.getElementById('localVideo').srcObject = stream diff --git a/tstream/pkg/room/sfu.go b/tstream/pkg/room/sfu.go index 422d3e3..6721844 100644 --- a/tstream/pkg/room/sfu.go +++ b/tstream/pkg/room/sfu.go @@ -40,6 +40,7 @@ func NewSFU() *SFU { } } +// TODO: this should be requested by client, not server auto send it every 3 seconds func (s *SFU) Start() { // request a keyframe every 3 seconds go func() { @@ -52,7 +53,6 @@ func (s *SFU) Start() { // TODO : break down this func func (s *SFU) AddPeer(cl *Client) error { - log.Printf("") peerConn, err := webrtc.NewPeerConnection(webrtc.Configuration{}) if err != nil { log.Printf("Failed to init peer connection: %s", err) @@ -60,7 +60,6 @@ func (s *SFU) AddPeer(cl *Client) error { } defer peerConn.Close() defer cl.Close() - log.Printf("") // Accept one audio and one video track incoming for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} { @@ -71,18 +70,16 @@ func (s *SFU) AddPeer(cl *Client) error { return err } } - log.Printf("") participant := &Participant{ peer: peerConn, client: cl, } - log.Printf("") + participantID := s.newParticipantID() - //s.lock.Lock() + s.lock.Lock() s.participants[participantID] = participant - //s.lock.Unlock() - log.Printf("") + s.lock.Unlock() // Trickle ICE. Emit server candidate to client peerConn.OnICECandidate(func(ice *webrtc.ICECandidate) { @@ -105,7 +102,6 @@ func (s *SFU) AddPeer(cl *Client) error { cl.Out <- payload }) - log.Printf("") peerConn.OnConnectionStateChange(func(p webrtc.PeerConnectionState) { switch p { @@ -126,7 +122,6 @@ func (s *SFU) AddPeer(cl *Client) error { } }) - log.Printf("") // Add all current tracks to this peer for _, track := range s.trackLocals { @@ -135,30 +130,36 @@ func (s *SFU) AddPeer(cl *Client) error { } } - log.Printf("") // only producer can broadcast if cl.Role() == message.RProducerRTC { peerConn.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { // Create a track to fan out our incoming video to all peerse trackLocal := s.addLocalTrack(t) defer s.removeLocalTrack(t.ID()) + log.Printf("Added a track: %s", trackLocal.Kind()) buf := make([]byte, 1500) for { - log.Printf("Getting :%s", trackLocal.Kind()) + log.Printf("Got :%s", trackLocal.Kind()) + // remote from remote i, _, err := t.Read(buf) if err != nil { return } + // send to all peers if _, err = trackLocal.Write(buf[:i]); err != nil { return } } }) } - log.Printf("") + // Signaling starts + // 1. Server send offer to the other peer connection + // 2. Server get answer + // 3. Server send ice candidate + // 4. Peer connection is established err = s.sendOffer(participant) for { @@ -200,6 +201,7 @@ func (s *SFU) AddPeer(cl *Client) error { log.Println(err) return err } + default: log.Printf("Invalid RTCEvent: %s", rtcMsg.Event) } diff --git a/tstream/pkg/streamer/chat.go b/tstream/pkg/streamer/chat.go index 207bc4b..c91cc34 100644 --- a/tstream/pkg/streamer/chat.go +++ b/tstream/pkg/streamer/chat.go @@ -3,22 +3,25 @@ package streamer import ( "encoding/json" + "errors" "fmt" "github.com/gdamore/tcell/v2" "github.com/gorilla/schema" "github.com/gorilla/websocket" - "github.com/pion/mediadevices" - "github.com/pion/mediadevices/pkg/codec/vpx" + "io" + //"github.com/pion/mediadevices" + //"github.com/pion/mediadevices/pkg/codec/opus" // This is required to use opus audio encoder + //"github.com/pion/mediadevices/pkg/codec/vpx" _ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter _ "github.com/pion/mediadevices/pkg/driver/microphone" // This is required to register microphone adapter - "github.com/pion/mediadevices/pkg/codec/opus" // This is required to use opus audio encoder - "github.com/pion/mediadevices/pkg/frame" - "github.com/pion/mediadevices/pkg/prop" + //"github.com/pion/mediadevices/pkg/frame" + //"github.com/pion/mediadevices/pkg/prop" "github.com/pion/webrtc/v3" "github.com/qnkhuat/tstream/pkg/message" "github.com/rivo/tview" "log" "math" + "net" "net/url" "strings" "time" @@ -145,91 +148,111 @@ func (c *Chat) StartVoiceService() error { ICEServers: []webrtc.ICEServer{{ URLs: []string{"stun:stun.l.google.com:19302"}}, }, - SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, } - // Create a new RTCPeerConnection - mediaEngine := webrtc.MediaEngine{} + peerConn, err := webrtc.NewPeerConnection(config) - vpxParams, err := vpx.NewVP8Params() - if err != nil { - log.Printf("Failed to open vpx: %s", err) - return err - } - vpxParams.BitRate = 500_000 // 500kbps - opusParams, err := opus.NewParams() - if err != nil { - panic(err) - } + // Create a new RTCPeerConnection + //mediaEngine := webrtc.MediaEngine{} - codecSelector := mediadevices.NewCodecSelector( - mediadevices.WithVideoEncoders(&vpxParams), - mediadevices.WithAudioEncoders(&opusParams), - ) - - codecSelector.Populate(&mediaEngine) - api := webrtc.NewAPI(webrtc.WithMediaEngine(&mediaEngine)) - peerConn, err := api.NewPeerConnection(config) - if err != nil { - log.Printf("Failed to start webrtc conn %s", err) - return err - } + //vpxParams, err := vpx.NewVP8Params() + //if err != nil { + // log.Printf("Failed to open vpx: %s", err) + // return err + //} + //vpxParams.BitRate = 500_000 // 500kbps + ////opusParams, err := opus.NewParams() + ////if err != nil { + //// panic(err) + ////} + + //codecSelector := mediadevices.NewCodecSelector( + // mediadevices.WithVideoEncoders(&vpxParams), + ////mediadevices.WithAudioEncoders(&opusParams), + //) + + //codecSelector.Populate(&mediaEngine) + //api := webrtc.NewAPI(webrtc.WithMediaEngine(&mediaEngine)) + //peerConn, err := api.NewPeerConnection(config) + //if err != nil { + // log.Printf("Failed to start webrtc conn %s", err) + // return err + //} - s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{ - Video: func(c *mediadevices.MediaTrackConstraints) { - c.FrameFormat = prop.FrameFormat(frame.FormatYUY2) - c.Width = prop.Int(640) - c.Height = prop.Int(480) - }, - Audio: func(c *mediadevices.MediaTrackConstraints) {} - - Codec: codecSelector, - }) + //s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{ + // //Video: func(c *mediadevices.MediaTrackConstraints) { + // // c.FrameFormat = prop.FrameFormat(frame.FormatYUY2) + // // c.Width = prop.Int(640) + // // c.Height = prop.Int(480) + // //}, + // Audio: func(c *mediadevices.MediaTrackConstraints) { + // c.SampleSize = prop.IntExact(16) // <-- this will set the source to use 16 bit format. + // }, + // Codec: codecSelector, + //}) + // Open a UDP Listener for RTP Packets on port 5004 + log.Printf("Openign an UDP host at localhost:5004") + listener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("localhost"), Port: 5004}) if err != nil { - log.Printf("This thing is too conventional %s", err) - return err - } - - for _, track := range s.GetTracks() { - track.OnEnded(func(err error) { - fmt.Printf("Track (ID: %s) ended with error: %v\n", - track.ID(), err) - }) - _, err = peerConn.AddTransceiverFromTrack(track, - webrtc.RtpTransceiverInit{ - Direction: webrtc.RTPTransceiverDirectionSendonly, - }, - ) - if err != nil { - log.Printf("Failed to add track %s", err) - return err - } + panic(err) } - - //peerConn, err := webrtc.NewPeerConnection(config) - - // Open a UDP Listener for RTP Packets on port 5004 - //listener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) - //if err != nil { - // panic(err) - //} //defer func() { // if err = listener.Close(); err != nil { // panic(err) // } //}() - //// Create a video track - //lcoalTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "tstream") + // Create a video track + //videoTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, "video", "pion") //if err != nil { // panic(err) //} - //rtpSender, err := peerConn.AddTrack(localTrack) + //videoTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, "video", "tstream") //if err != nil { // panic(err) //} + audioTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "tstream") + if err != nil { + panic(err) + } + + rtpSender, err := peerConn.AddTrack(audioTrack) + if err != nil { + panic(err) + } + + // Read incoming RTCP packets + // Before these packets are returned they are processed by interceptors. For things + // like NACK this needs to be called. + go func() { + rtcpBuf := make([]byte, 1500) + for { + //log.Printf("Reading from video") + if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { + return + } + } + }() + + //for _, track := range s.GetTracks() { + // track.OnEnded(func(err error) { + // fmt.Printf("Track (ID: %s) ended with error: %v\n", + // track.ID(), err) + // }) + + // _, err = peerConn.AddTransceiverFromTrack(track, + // webrtc.RtpTransceiverInit{ + // Direction: webrtc.RTPTransceiverDirectionSendonly, + // }, + // ) + // if err != nil { + // log.Printf("Failed to add track %s", err) + // return err + // } + //} + // wsconnection is for signaling and update track changes wsConn, err := c.connectWS(message.RProducerRTC) if err != nil { @@ -322,9 +345,9 @@ func (c *Chat) StartVoiceService() error { continue } - if err := peerConn.SetLocalDescription(answer); err != nil { - log.Printf("Failed to set local description: %v", err) - continue + err = peerConn.SetLocalDescription(answer) + if err != nil { + log.Printf("Failed to set local description: %s", err) } answerByte, _ := json.Marshal(answer) @@ -356,6 +379,27 @@ func (c *Chat) StartVoiceService() error { } } }() + go func() { + + // Read RTP packets forever and send them to the WebRTC Client + inboundRTPPacket := make([]byte, 1500) // UDP MTU + for { + //log.Printf("read from video") + n, _, err := listener.ReadFrom(inboundRTPPacket) + if err != nil { + panic(fmt.Sprintf("error during read: %s", err)) + } + + if _, err = audioTrack.Write(inboundRTPPacket[:n]); err != nil { + if errors.Is(err, io.ErrClosedPipe) { + // The peerConnection has been closed. + return + } + + panic(err) + } + } + }() return nil } diff --git a/tstream/pkg/streamer/streamer.go b/tstream/pkg/streamer/streamer.go index 88c7f9c..ea9065e 100644 --- a/tstream/pkg/streamer/streamer.go +++ b/tstream/pkg/streamer/streamer.go @@ -21,8 +21,6 @@ import ( "github.com/qnkhuat/tstream/pkg/ptyMaster" ) -// TODO: if we supports windows this should be changed - type Streamer struct { pty *ptyMaster.PtyMaster serverAddr string