diff --git a/tstream/consumer.html b/tstream/consumer.html
index f230b84..dd0cdd3 100644
--- a/tstream/consumer.html
+++ b/tstream/consumer.html
@@ -9,6 +9,7 @@
Local Video
Remote Video
+
Logs
@@ -47,26 +48,43 @@ Logs
//.then(stream => {
let pc = new RTCPeerConnection()
pc.ontrack = function (event) {
- //if (event.track.kind !== 'audio') {
- // return
- // }
-
- let el = document.createElement(event.track.kind)
- console.log(event.streams)
- el.srcObject = event.streams[0]
- el.autoplay = true
- el.controls = true
- document.getElementById('remoteVideos').appendChild(el)
-
- event.track.onmute = function(event) {
- el.play()
+ if (event.track.kind == 'audio') {
+ let el = document.createElement(event.track.kind)
+ console.log(event.streams)
+ el.srcObject = event.streams[0]
+ el.autoplay = true
+ el.controls = true
+ document.getElementById('remoteAudios').appendChild(el)
+
+ event.track.onmute = function(event) {
+ el.play()
+ }
+
+ event.streams[0].onremovetrack = ({track}) => {
+ if (el.parentNode) {
+ el.parentNode.removeChild(el)
+ }
+ }
}
+ else {
+ let el = document.createElement(event.track.kind)
+ console.log(event.streams)
+ el.srcObject = event.streams[0]
+ el.autoplay = true
+ el.controls = true
+ document.getElementById('remoteVideos').appendChild(el)
+
+ event.track.onmute = function(event) {
+ el.play()
+ }
- event.streams[0].onremovetrack = ({track}) => {
- if (el.parentNode) {
- el.parentNode.removeChild(el)
+ event.streams[0].onremovetrack = ({track}) => {
+ if (el.parentNode) {
+ el.parentNode.removeChild(el)
+ }
}
}
+
}
//document.getElementById('localVideo').srcObject = stream
diff --git a/tstream/pkg/room/sfu.go b/tstream/pkg/room/sfu.go
index 422d3e3..6721844 100644
--- a/tstream/pkg/room/sfu.go
+++ b/tstream/pkg/room/sfu.go
@@ -40,6 +40,7 @@ func NewSFU() *SFU {
}
}
+// TODO: this should be requested by client, not server auto send it every 3 seconds
func (s *SFU) Start() {
// request a keyframe every 3 seconds
go func() {
@@ -52,7 +53,6 @@ func (s *SFU) Start() {
// TODO : break down this func
func (s *SFU) AddPeer(cl *Client) error {
- log.Printf("")
peerConn, err := webrtc.NewPeerConnection(webrtc.Configuration{})
if err != nil {
log.Printf("Failed to init peer connection: %s", err)
@@ -60,7 +60,6 @@ func (s *SFU) AddPeer(cl *Client) error {
}
defer peerConn.Close()
defer cl.Close()
- log.Printf("")
// Accept one audio and one video track incoming
for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
@@ -71,18 +70,16 @@ func (s *SFU) AddPeer(cl *Client) error {
return err
}
}
- log.Printf("")
participant := &Participant{
peer: peerConn,
client: cl,
}
- log.Printf("")
+
participantID := s.newParticipantID()
- //s.lock.Lock()
+ s.lock.Lock()
s.participants[participantID] = participant
- //s.lock.Unlock()
- log.Printf("")
+ s.lock.Unlock()
// Trickle ICE. Emit server candidate to client
peerConn.OnICECandidate(func(ice *webrtc.ICECandidate) {
@@ -105,7 +102,6 @@ func (s *SFU) AddPeer(cl *Client) error {
cl.Out <- payload
})
- log.Printf("")
peerConn.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
switch p {
@@ -126,7 +122,6 @@ func (s *SFU) AddPeer(cl *Client) error {
}
})
- log.Printf("")
// Add all current tracks to this peer
for _, track := range s.trackLocals {
@@ -135,30 +130,36 @@ func (s *SFU) AddPeer(cl *Client) error {
}
}
- log.Printf("")
// only producer can broadcast
if cl.Role() == message.RProducerRTC {
peerConn.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
// Create a track to fan out our incoming video to all peerse
trackLocal := s.addLocalTrack(t)
defer s.removeLocalTrack(t.ID())
+ log.Printf("Added a track: %s", trackLocal.Kind())
buf := make([]byte, 1500)
for {
- log.Printf("Getting :%s", trackLocal.Kind())
+ log.Printf("Got :%s", trackLocal.Kind())
+ // remote from remote
i, _, err := t.Read(buf)
if err != nil {
return
}
+ // send to all peers
if _, err = trackLocal.Write(buf[:i]); err != nil {
return
}
}
})
}
- log.Printf("")
+ // Signaling starts
+ // 1. Server send offer to the other peer connection
+ // 2. Server get answer
+ // 3. Server send ice candidate
+ // 4. Peer connection is established
err = s.sendOffer(participant)
for {
@@ -200,6 +201,7 @@ func (s *SFU) AddPeer(cl *Client) error {
log.Println(err)
return err
}
+
default:
log.Printf("Invalid RTCEvent: %s", rtcMsg.Event)
}
diff --git a/tstream/pkg/streamer/chat.go b/tstream/pkg/streamer/chat.go
index 207bc4b..c91cc34 100644
--- a/tstream/pkg/streamer/chat.go
+++ b/tstream/pkg/streamer/chat.go
@@ -3,22 +3,25 @@ package streamer
import (
"encoding/json"
+ "errors"
"fmt"
"github.com/gdamore/tcell/v2"
"github.com/gorilla/schema"
"github.com/gorilla/websocket"
- "github.com/pion/mediadevices"
- "github.com/pion/mediadevices/pkg/codec/vpx"
+ "io"
+ //"github.com/pion/mediadevices"
+ //"github.com/pion/mediadevices/pkg/codec/opus" // This is required to use opus audio encoder
+ //"github.com/pion/mediadevices/pkg/codec/vpx"
_ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter
_ "github.com/pion/mediadevices/pkg/driver/microphone" // This is required to register microphone adapter
- "github.com/pion/mediadevices/pkg/codec/opus" // This is required to use opus audio encoder
- "github.com/pion/mediadevices/pkg/frame"
- "github.com/pion/mediadevices/pkg/prop"
+ //"github.com/pion/mediadevices/pkg/frame"
+ //"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/webrtc/v3"
"github.com/qnkhuat/tstream/pkg/message"
"github.com/rivo/tview"
"log"
"math"
+ "net"
"net/url"
"strings"
"time"
@@ -145,91 +148,111 @@ func (c *Chat) StartVoiceService() error {
ICEServers: []webrtc.ICEServer{{
URLs: []string{"stun:stun.l.google.com:19302"}},
},
- SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
}
- // Create a new RTCPeerConnection
- mediaEngine := webrtc.MediaEngine{}
+ peerConn, err := webrtc.NewPeerConnection(config)
- vpxParams, err := vpx.NewVP8Params()
- if err != nil {
- log.Printf("Failed to open vpx: %s", err)
- return err
- }
- vpxParams.BitRate = 500_000 // 500kbps
- opusParams, err := opus.NewParams()
- if err != nil {
- panic(err)
- }
+ // Create a new RTCPeerConnection
+ //mediaEngine := webrtc.MediaEngine{}
- codecSelector := mediadevices.NewCodecSelector(
- mediadevices.WithVideoEncoders(&vpxParams),
- mediadevices.WithAudioEncoders(&opusParams),
- )
-
- codecSelector.Populate(&mediaEngine)
- api := webrtc.NewAPI(webrtc.WithMediaEngine(&mediaEngine))
- peerConn, err := api.NewPeerConnection(config)
- if err != nil {
- log.Printf("Failed to start webrtc conn %s", err)
- return err
- }
+ //vpxParams, err := vpx.NewVP8Params()
+ //if err != nil {
+ // log.Printf("Failed to open vpx: %s", err)
+ // return err
+ //}
+ //vpxParams.BitRate = 500_000 // 500kbps
+ ////opusParams, err := opus.NewParams()
+ ////if err != nil {
+ //// panic(err)
+ ////}
+
+ //codecSelector := mediadevices.NewCodecSelector(
+ // mediadevices.WithVideoEncoders(&vpxParams),
+ ////mediadevices.WithAudioEncoders(&opusParams),
+ //)
+
+ //codecSelector.Populate(&mediaEngine)
+ //api := webrtc.NewAPI(webrtc.WithMediaEngine(&mediaEngine))
+ //peerConn, err := api.NewPeerConnection(config)
+ //if err != nil {
+ // log.Printf("Failed to start webrtc conn %s", err)
+ // return err
+ //}
- s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
- Video: func(c *mediadevices.MediaTrackConstraints) {
- c.FrameFormat = prop.FrameFormat(frame.FormatYUY2)
- c.Width = prop.Int(640)
- c.Height = prop.Int(480)
- },
- Audio: func(c *mediadevices.MediaTrackConstraints) {}
-
- Codec: codecSelector,
- })
+ //s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
+ // //Video: func(c *mediadevices.MediaTrackConstraints) {
+ // // c.FrameFormat = prop.FrameFormat(frame.FormatYUY2)
+ // // c.Width = prop.Int(640)
+ // // c.Height = prop.Int(480)
+ // //},
+ // Audio: func(c *mediadevices.MediaTrackConstraints) {
+ // c.SampleSize = prop.IntExact(16) // <-- this will set the source to use 16 bit format.
+ // },
+ // Codec: codecSelector,
+ //})
+ // Open a UDP Listener for RTP Packets on port 5004
+ log.Printf("Openign an UDP host at localhost:5004")
+ listener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("localhost"), Port: 5004})
if err != nil {
- log.Printf("This thing is too conventional %s", err)
- return err
- }
-
- for _, track := range s.GetTracks() {
- track.OnEnded(func(err error) {
- fmt.Printf("Track (ID: %s) ended with error: %v\n",
- track.ID(), err)
- })
- _, err = peerConn.AddTransceiverFromTrack(track,
- webrtc.RtpTransceiverInit{
- Direction: webrtc.RTPTransceiverDirectionSendonly,
- },
- )
- if err != nil {
- log.Printf("Failed to add track %s", err)
- return err
- }
+ panic(err)
}
-
- //peerConn, err := webrtc.NewPeerConnection(config)
-
- // Open a UDP Listener for RTP Packets on port 5004
- //listener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004})
- //if err != nil {
- // panic(err)
- //}
//defer func() {
// if err = listener.Close(); err != nil {
// panic(err)
// }
//}()
- //// Create a video track
- //lcoalTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "tstream")
+ // Create a video track
+ //videoTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, "video", "pion")
//if err != nil {
// panic(err)
//}
- //rtpSender, err := peerConn.AddTrack(localTrack)
+ //videoTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, "video", "tstream")
//if err != nil {
// panic(err)
//}
+ audioTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "tstream")
+ if err != nil {
+ panic(err)
+ }
+
+ rtpSender, err := peerConn.AddTrack(audioTrack)
+ if err != nil {
+ panic(err)
+ }
+
+ // Read incoming RTCP packets
+ // Before these packets are returned they are processed by interceptors. For things
+ // like NACK this needs to be called.
+ go func() {
+ rtcpBuf := make([]byte, 1500)
+ for {
+ //log.Printf("Reading from video")
+ if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
+ return
+ }
+ }
+ }()
+
+ //for _, track := range s.GetTracks() {
+ // track.OnEnded(func(err error) {
+ // fmt.Printf("Track (ID: %s) ended with error: %v\n",
+ // track.ID(), err)
+ // })
+
+ // _, err = peerConn.AddTransceiverFromTrack(track,
+ // webrtc.RtpTransceiverInit{
+ // Direction: webrtc.RTPTransceiverDirectionSendonly,
+ // },
+ // )
+ // if err != nil {
+ // log.Printf("Failed to add track %s", err)
+ // return err
+ // }
+ //}
+
// wsconnection is for signaling and update track changes
wsConn, err := c.connectWS(message.RProducerRTC)
if err != nil {
@@ -322,9 +345,9 @@ func (c *Chat) StartVoiceService() error {
continue
}
- if err := peerConn.SetLocalDescription(answer); err != nil {
- log.Printf("Failed to set local description: %v", err)
- continue
+ err = peerConn.SetLocalDescription(answer)
+ if err != nil {
+ log.Printf("Failed to set local description: %s", err)
}
answerByte, _ := json.Marshal(answer)
@@ -356,6 +379,27 @@ func (c *Chat) StartVoiceService() error {
}
}
}()
+ go func() {
+
+ // Read RTP packets forever and send them to the WebRTC Client
+ inboundRTPPacket := make([]byte, 1500) // UDP MTU
+ for {
+ //log.Printf("read from video")
+ n, _, err := listener.ReadFrom(inboundRTPPacket)
+ if err != nil {
+ panic(fmt.Sprintf("error during read: %s", err))
+ }
+
+ if _, err = audioTrack.Write(inboundRTPPacket[:n]); err != nil {
+ if errors.Is(err, io.ErrClosedPipe) {
+ // The peerConnection has been closed.
+ return
+ }
+
+ panic(err)
+ }
+ }
+ }()
return nil
}
diff --git a/tstream/pkg/streamer/streamer.go b/tstream/pkg/streamer/streamer.go
index 88c7f9c..ea9065e 100644
--- a/tstream/pkg/streamer/streamer.go
+++ b/tstream/pkg/streamer/streamer.go
@@ -21,8 +21,6 @@ import (
"github.com/qnkhuat/tstream/pkg/ptyMaster"
)
-// TODO: if we supports windows this should be changed
-
type Streamer struct {
pty *ptyMaster.PtyMaster
serverAddr string