Skip to content

Commit

Permalink
Remove the jitter buffer in the non transcoded WHIP path (#245)
Browse files Browse the repository at this point in the history
  • Loading branch information
biglittlebigben authored Apr 3, 2024
1 parent 572ca2a commit 706b057
Show file tree
Hide file tree
Showing 8 changed files with 539 additions and 221 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ require (
github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8
github.com/livekit/protocol v1.12.0
github.com/livekit/psrpc v0.5.3-0.20240315045730-ba2e5b9923b5
github.com/livekit/server-sdk-go/v2 v2.1.0
github.com/livekit/server-sdk-go/v2 v2.1.1-0.20240403161440-39126a2c9d49
github.com/pion/dtls/v2 v2.2.10
github.com/pion/interceptor v0.1.25
github.com/pion/interceptor v0.1.27
github.com/pion/rtcp v1.2.14
github.com/pion/rtp v1.8.3
github.com/pion/rtp v1.8.4
github.com/pion/sdp/v3 v3.0.8
github.com/pion/webrtc/v3 v3.2.29
github.com/prometheus/client_golang v1.19.0
Expand Down Expand Up @@ -90,7 +90,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.17.0 // indirect
Expand Down
14 changes: 8 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ github.com/livekit/protocol v1.12.0 h1:B7qsqq5xf9MmyG9WEk9/gMsfMVXuyLNxX5cO6TQil
github.com/livekit/protocol v1.12.0/go.mod h1:G7Pa985GhZv2MCC3UnUocBhZfi3DsWA6WmlSkkpQYTM=
github.com/livekit/psrpc v0.5.3-0.20240315045730-ba2e5b9923b5 h1:I2noW2VjGcf8E0pCJVK5p3x9G3O2u57DaPgSCJWtkRo=
github.com/livekit/psrpc v0.5.3-0.20240315045730-ba2e5b9923b5/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.1.0 h1:2YIW6c1IhHTVCsBsTD6P3TLIP3l/3nK+iokopquP8eE=
github.com/livekit/server-sdk-go/v2 v2.1.0/go.mod h1:uOiQU0kIhiZM10y9RIuig2CwRA/g5fW5bzy+2WiIxP8=
github.com/livekit/server-sdk-go/v2 v2.1.1-0.20240403161440-39126a2c9d49 h1:azrrvWCMRJyaPbMJ1fta+OuzWmt6/fsuM5jdGK/dibE=
github.com/livekit/server-sdk-go/v2 v2.1.1-0.20240403161440-39126a2c9d49/go.mod h1:8KmZmvjahDfYnF0STHatRzxyV+OH5gN3zQlHtbpqOAU=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
Expand Down Expand Up @@ -136,8 +136,9 @@ github.com/pion/dtls/v2 v2.2.10 h1:u2Axk+FyIR1VFTPurktB+1zoEPGIW3bmyj3LEFrXjAA=
github.com/pion/dtls/v2 v2.2.10/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v2 v2.3.13 h1:xOxP+4V9nSDlUaGFRf/LvAuGHDXRcjIdsbbXPK/w7c8=
github.com/pion/ice/v2 v2.3.13/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/interceptor v0.1.25 h1:pwY9r7P6ToQ3+IF0bajN0xmk/fNw/suTgaTdlwTDmhc=
github.com/pion/interceptor v0.1.25/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y=
github.com/pion/interceptor v0.1.27 h1:mZ01OiGiukwRxezmDGzYjjokCVlDOk4T6BfaL5qrtGo=
github.com/pion/interceptor v0.1.27/go.mod h1:/vVaqLwDjGv4GRbgmChIKZIT5EXFDijwmj4WmIYy9bI=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.12 h1:CiMYlY+O0azojWDmxdNr7ADGrnZ+V6Ilfner+6mSVK8=
Expand All @@ -149,8 +150,9 @@ github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9
github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE=
github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtp v1.8.2/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.3 h1:VEHxqzSVQxCkKDSHro5/4IUUG1ea+MFdqR2R3xSpNU8=
github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.4 h1:VqNGMNjMDMy9y0d+h+0dfjiWVKUEDQvA963jhJwu200=
github.com/pion/rtp v1.8.4/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/sctp v1.8.5/go.mod h1:SUFFfDpViyKejTAdwD1d/HQsCu+V/40cCs2nZIvC3s0=
github.com/pion/sctp v1.8.12 h1:2VX50pedElH+is6FI+OKyRTeN5oy4mrk2HjnGa3UCmY=
github.com/pion/sctp v1.8.12/go.mod h1:cMLT45jqw3+jiJCrtHVwfQLnfR0MGZ4rgOJwUOIqLkI=
Expand Down Expand Up @@ -248,8 +250,8 @@ golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98y
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ=
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc=
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw=
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ=
golang.org/x/image v0.15.0 h1:kOELfmgrmJlw4Cdb7g/QGuB3CvDrXbqEIww/pNtNBm8=
golang.org/x/image v0.15.0/go.mod h1:HUYqC05R2ZcZ3ejNQsIHQDQiwWM4JBqmm6MKANTp4LE=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand Down
68 changes: 55 additions & 13 deletions pkg/lksdk_output/lksdk_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync/atomic"
"time"

"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"

Expand All @@ -39,29 +40,47 @@ const (
type SampleProvider interface {
Close() error
}

type KeyFrameEmitter interface {
ForceKeyFrame() error
}

type PLIHandler struct {
p atomic.Pointer[KeyFrameEmitter]
type PacketSink interface {
HandleRTCPPacket(pkt rtcp.Packet) error
}

type RTCPHandler struct {
p atomic.Pointer[PacketSink]
k atomic.Pointer[KeyFrameEmitter]
}

func (h *PLIHandler) HandlePLI() error {
func (h *RTCPHandler) HandleRTCP(pkt rtcp.Packet) error {
p := h.p.Load()

if p != nil {
return (*p).ForceKeyFrame()
return (*p).HandleRTCPPacket(pkt)
}

return nil
}

func (h *PLIHandler) SetKeyFrameEmitter(p KeyFrameEmitter) {
func (h *RTCPHandler) SetPacketSink(p PacketSink) {
h.p.Store(&p)
}

func (h *RTCPHandler) HandlePLI() error {
k := h.k.Load()

if k != nil {
return (*k).ForceKeyFrame()
}

return nil
}

func (h *RTCPHandler) SetKeyFrameEmitter(k KeyFrameEmitter) {
h.k.Store(&k)
}

type LKSDKOutput struct {
logger logger.Logger
room *lksdk.Room
Expand Down Expand Up @@ -118,7 +137,9 @@ func NewLKSDKOutput(ctx context.Context, p *params.Params) (*LKSDKOutput, error)
lksdk.WithAutoSubscribe(false),
}

if !p.BypassTranscoding {
if p.BypassTranscoding {
opts = append(opts, lksdk.WithInterceptors([]interceptor.Factory{}))
} else {
var br uint32
if p.VideoEncodingOptions != nil {
for _, l := range p.VideoEncodingOptions.Layers {
Expand Down Expand Up @@ -200,7 +221,7 @@ func (s *LKSDKOutput) AddAudioTrack(mimeType string, disableDTX bool, stereo boo
return track, nil
}

func (s *LKSDKOutput) AddVideoTrack(layers []*livekit.VideoLayer, mimeType string) ([]*lksdk.LocalTrack, []*PLIHandler, error) {
func (s *LKSDKOutput) AddVideoTrack(layers []*livekit.VideoLayer, mimeType string) ([]*lksdk.LocalTrack, []*RTCPHandler, error) {
opts := &lksdk.TrackPublicationOptions{
Name: s.params.Video.Name,
Source: s.params.Video.Source,
Expand All @@ -211,18 +232,22 @@ func (s *LKSDKOutput) AddVideoTrack(layers []*livekit.VideoLayer, mimeType strin
var err error

tracks := make([]*lksdk.LocalSampleTrack, 0)
pliHandlers := make([]*PLIHandler, 0)
rtcpHandlers := make([]*RTCPHandler, 0)
for _, layer := range layers {
pliHandler := &PLIHandler{}
pliHandlers = append(pliHandlers, pliHandler)
rtcpHandler := &RTCPHandler{}
rtcpHandlers = append(rtcpHandlers, rtcpHandler)

onRTCP := func(pkt rtcp.Packet) {
switch pkt.(type) {
case *rtcp.PictureLossIndication:
if err := pliHandler.HandlePLI(); err != nil {
if err := rtcpHandler.HandlePLI(); err != nil {
s.logger.Errorw("could not force key frame", err)
}
}

if err := rtcpHandler.HandleRTCP(pkt); err != nil {
s.logger.Errorw("RTCP message handling failed", err)
}
}
track, err := lksdk.NewLocalSampleTrack(webrtc.RTPCodecCapability{
MimeType: mimeType,
Expand Down Expand Up @@ -256,7 +281,7 @@ func (s *LKSDKOutput) AddVideoTrack(layers []*livekit.VideoLayer, mimeType strin

s.logger.Debugw("published video track")

return tracks, pliHandlers, nil
return tracks, rtcpHandlers, nil
}

func (s *LKSDKOutput) AddOutputs(o ...SampleProvider) {
Expand All @@ -280,6 +305,23 @@ func (s *LKSDKOutput) closeOutput() {
s.room.Disconnect()
}

func (s *LKSDKOutput) WriteRTCP(pkts []rtcp.Packet) error {
if s.room == nil {
return nil
}

if s.room.LocalParticipant == nil {
return nil
}

pc := s.room.LocalParticipant.GetPublisherPeerConnection()
if pc == nil {
return nil
}

return pc.WriteRTCP(pkts)
}

func (s *LKSDKOutput) Close() error {
s.closeOutput()

Expand Down
90 changes: 90 additions & 0 deletions pkg/utils/rtcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import "github.com/pion/rtcp"

func ReplaceRTCPPacketSSRC(pkt rtcp.Packet, newSSRC uint32) (rtcp.Packet, error) {
switch cpkt := pkt.(type) {
case *rtcp.SenderReport:
cpkt.SSRC = newSSRC
case *rtcp.ReceiverReport:
cpkt.SSRC = newSSRC
for i, _ := range cpkt.Reports {
cpkt.Reports[i].SSRC = newSSRC
}
case *rtcp.SourceDescription:
for i, _ := range cpkt.Chunks {
cpkt.Chunks[i].Source = newSSRC
}
case *rtcp.Goodbye:
for i, _ := range cpkt.Sources {
cpkt.Sources[i] = newSSRC
}
case *rtcp.TransportLayerNack:
cpkt.SenderSSRC = newSSRC
cpkt.MediaSSRC = newSSRC
case *rtcp.RapidResynchronizationRequest:
cpkt.SenderSSRC = newSSRC
cpkt.MediaSSRC = newSSRC
case *rtcp.TransportLayerCC:
cpkt.SenderSSRC = newSSRC
cpkt.MediaSSRC = newSSRC
case *rtcp.CCFeedbackReport:
cpkt.SenderSSRC = newSSRC
case *rtcp.PictureLossIndication:
cpkt.SenderSSRC = newSSRC
cpkt.MediaSSRC = newSSRC
case *rtcp.SliceLossIndication:
cpkt.SenderSSRC = newSSRC
cpkt.MediaSSRC = newSSRC
case *rtcp.ReceiverEstimatedMaximumBitrate:
cpkt.SenderSSRC = newSSRC
case *rtcp.FullIntraRequest:
cpkt.SenderSSRC = newSSRC
cpkt.MediaSSRC = newSSRC
case *rtcp.ExtendedReport:
cpkt.SenderSSRC = newSSRC
err := handleExtendedReports(cpkt.Reports, newSSRC)
if err != nil {
return nil, err
}
}

return pkt, nil
}

func handleExtendedReports(reports []rtcp.ReportBlock, newSSRC uint32) error {
for _, report := range reports {
switch r := report.(type) {
case *rtcp.LossRLEReportBlock:
r.SSRC = newSSRC
case *rtcp.DuplicateRLEReportBlock:
r.SSRC = newSSRC
case *rtcp.PacketReceiptTimesReportBlock:
r.SSRC = newSSRC
case *rtcp.DLRRReportBlock:
for i, _ := range r.Reports {
r.Reports[i].SSRC = newSSRC
}
case *rtcp.StatisticsSummaryReportBlock:
r.SSRC = newSSRC
case *rtcp.VoIPMetricsReportBlock:
r.SSRC = newSSRC
}
}

return nil
}
Loading

0 comments on commit 706b057

Please sign in to comment.