Skip to content

Commit

Permalink
stream audio using ffmpeg + rtp
Browse files Browse the repository at this point in the history
  • Loading branch information
qnkhuat committed Jul 19, 2021
1 parent a443613 commit 8d4f125
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 102 deletions.
50 changes: 34 additions & 16 deletions tstream/consumer.html
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ <h3> Local Video </h3>

<h3> Remote Video </h3>
<div id="remoteVideos"></div> <br />
<div id="remoteAudios"></div> <br />

<h3> Logs </h3>
<div id="logs"></div>
Expand Down Expand Up @@ -47,26 +48,43 @@ <h3> Logs </h3>
//.then(stream => {
let pc = new RTCPeerConnection()
pc.ontrack = function (event) {
//if (event.track.kind !== 'audio') {
// return
// }

let el = document.createElement(event.track.kind)
console.log(event.streams)
el.srcObject = event.streams[0]
el.autoplay = true
el.controls = true
document.getElementById('remoteVideos').appendChild(el)

event.track.onmute = function(event) {
el.play()
if (event.track.kind == 'audio') {
let el = document.createElement(event.track.kind)
console.log(event.streams)
el.srcObject = event.streams[0]
el.autoplay = true
el.controls = true
document.getElementById('remoteAudios').appendChild(el)

event.track.onmute = function(event) {
el.play()
}

event.streams[0].onremovetrack = ({track}) => {
if (el.parentNode) {
el.parentNode.removeChild(el)
}
}
}
else {
let el = document.createElement(event.track.kind)
console.log(event.streams)
el.srcObject = event.streams[0]
el.autoplay = true
el.controls = true
document.getElementById('remoteVideos').appendChild(el)

event.track.onmute = function(event) {
el.play()
}

event.streams[0].onremovetrack = ({track}) => {
if (el.parentNode) {
el.parentNode.removeChild(el)
event.streams[0].onremovetrack = ({track}) => {
if (el.parentNode) {
el.parentNode.removeChild(el)
}
}
}

}

//document.getElementById('localVideo').srcObject = stream
Expand Down
26 changes: 14 additions & 12 deletions tstream/pkg/room/sfu.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func NewSFU() *SFU {
}
}

// TODO: this should be requested by client, not server auto send it every 3 seconds
func (s *SFU) Start() {
// request a keyframe every 3 seconds
go func() {
Expand All @@ -52,15 +53,13 @@ func (s *SFU) Start() {
// TODO : break down this func
func (s *SFU) AddPeer(cl *Client) error {

log.Printf("")
peerConn, err := webrtc.NewPeerConnection(webrtc.Configuration{})
if err != nil {
log.Printf("Failed to init peer connection: %s", err)
return err
}
defer peerConn.Close()
defer cl.Close()
log.Printf("")

// Accept one audio and one video track incoming
for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
Expand All @@ -71,18 +70,16 @@ func (s *SFU) AddPeer(cl *Client) error {
return err
}
}
log.Printf("")

participant := &Participant{
peer: peerConn,
client: cl,
}
log.Printf("")

participantID := s.newParticipantID()
//s.lock.Lock()
s.lock.Lock()
s.participants[participantID] = participant
//s.lock.Unlock()
log.Printf("")
s.lock.Unlock()

// Trickle ICE. Emit server candidate to client
peerConn.OnICECandidate(func(ice *webrtc.ICECandidate) {
Expand All @@ -105,7 +102,6 @@ func (s *SFU) AddPeer(cl *Client) error {

cl.Out <- payload
})
log.Printf("")

peerConn.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
switch p {
Expand All @@ -126,7 +122,6 @@ func (s *SFU) AddPeer(cl *Client) error {
}

})
log.Printf("")

// Add all current tracks to this peer
for _, track := range s.trackLocals {
Expand All @@ -135,30 +130,36 @@ func (s *SFU) AddPeer(cl *Client) error {
}
}

log.Printf("")
// only producer can broadcast
if cl.Role() == message.RProducerRTC {
peerConn.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
// Create a track to fan out our incoming video to all peerse
trackLocal := s.addLocalTrack(t)
defer s.removeLocalTrack(t.ID())
log.Printf("Added a track: %s", trackLocal.Kind())

buf := make([]byte, 1500)
for {
log.Printf("Getting :%s", trackLocal.Kind())
log.Printf("Got :%s", trackLocal.Kind())
// remote from remote
i, _, err := t.Read(buf)
if err != nil {
return
}

// send to all peers
if _, err = trackLocal.Write(buf[:i]); err != nil {
return
}
}
})
}
log.Printf("")

// Signaling starts
// 1. Server send offer to the other peer connection
// 2. Server get answer
// 3. Server send ice candidate
// 4. Peer connection is established
err = s.sendOffer(participant)

for {
Expand Down Expand Up @@ -200,6 +201,7 @@ func (s *SFU) AddPeer(cl *Client) error {
log.Println(err)
return err
}

default:
log.Printf("Invalid RTCEvent: %s", rtcMsg.Event)
}
Expand Down
Loading

0 comments on commit 8d4f125

Please sign in to comment.