Skip to content

Commit

Permalink
Make onNegotiationNeeded conform to spec
Browse files Browse the repository at this point in the history
- Removes non-canon logic
  • Loading branch information
edaniels committed Jun 24, 2024
1 parent 4430f41 commit b3856ff
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 96 deletions.
17 changes: 15 additions & 2 deletions operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@ type operations struct {
mu sync.Mutex
busy bool
ops *list.List

updateNegotiationNeededFlagOnEmptyChain *atomicBool
onNegotiationNeeded func()
}

func newOperations() *operations {
func newOperations(
updateNegotiationNeededFlagOnEmptyChain *atomicBool,
onNegotiationNeeded func(),
) *operations {
return &operations{
ops: list.New(),
ops: list.New(),
updateNegotiationNeededFlagOnEmptyChain: updateNegotiationNeededFlagOnEmptyChain,
onNegotiationNeeded: onNegotiationNeeded,
}
}

Expand Down Expand Up @@ -93,4 +101,9 @@ func (o *operations) start() {
fn()
fn = o.pop()
}
if !o.updateNegotiationNeededFlagOnEmptyChain.get() {
return
}
o.updateNegotiationNeededFlagOnEmptyChain.set(false)
o.onNegotiationNeeded()
}
22 changes: 19 additions & 3 deletions operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,31 @@
package webrtc

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestOperations_Enqueue(t *testing.T) {
ops := newOperations()
for i := 0; i < 100; i++ {
updateNegotiationNeededFlagOnEmptyChain := &atomicBool{}
onNegotiationNeededCalledCount := 0
var onNegotiationNeededCalledCountMu sync.Mutex
ops := newOperations(updateNegotiationNeededFlagOnEmptyChain, func() {
onNegotiationNeededCalledCountMu.Lock()
onNegotiationNeededCalledCount++
onNegotiationNeededCalledCountMu.Unlock()
})
for resultSet := 0; resultSet < 100; resultSet++ {
results := make([]int, 16)
resultSetCopy := resultSet
for i := range results {
func(j int) {
ops.Enqueue(func() {
results[j] = j * j
if resultSetCopy > 50 {
updateNegotiationNeededFlagOnEmptyChain.set(true)
}
})
}(i)
}
Expand All @@ -26,9 +38,13 @@ func TestOperations_Enqueue(t *testing.T) {
assert.Equal(t, len(expected), len(results))
assert.Equal(t, expected, results)
}
onNegotiationNeededCalledCountMu.Lock()
defer onNegotiationNeededCalledCountMu.Unlock()
assert.NotEqual(t, onNegotiationNeededCalledCount, 0)
}

func TestOperations_Done(*testing.T) {
ops := newOperations()
ops := newOperations(&atomicBool{}, func() {
})
ops.Done()
}
74 changes: 32 additions & 42 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ type PeerConnection struct {

idpLoginURL *string

isClosed *atomicBool
isNegotiationNeeded *atomicBool
negotiationNeededState negotiationNeededState
isClosed *atomicBool
isNegotiationNeeded *atomicBool
updateNegotiationNeededFlagOnEmptyChain *atomicBool

lastOffer string
lastAnswer string
Expand Down Expand Up @@ -115,6 +115,7 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
// https://w3c.github.io/webrtc-pc/#constructor (Step #2)
// Some variables defined explicitly despite their implicit zero values to
// allow better readability to understand what is happening.

pc := &PeerConnection{
statsID: fmt.Sprintf("PeerConnection-%d", time.Now().UnixNano()),
configuration: Configuration{
Expand All @@ -125,18 +126,19 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
Certificates: []Certificate{},
ICECandidatePoolSize: 0,
},
ops: newOperations(),
isClosed: &atomicBool{},
isNegotiationNeeded: &atomicBool{},
negotiationNeededState: negotiationNeededStateEmpty,
lastOffer: "",
lastAnswer: "",
greaterMid: -1,
signalingState: SignalingStateStable,
isClosed: &atomicBool{},
isNegotiationNeeded: &atomicBool{},
updateNegotiationNeededFlagOnEmptyChain: &atomicBool{},
lastOffer: "",
lastAnswer: "",
greaterMid: -1,
signalingState: SignalingStateStable,

api: api,
log: api.settingEngine.LoggerFactory.NewLogger("pc"),
}
pc.ops = newOperations(pc.updateNegotiationNeededFlagOnEmptyChain, pc.onNegotiationNeeded)

pc.iceConnectionState.Store(ICEConnectionStateNew)
pc.connectionState.Store(PeerConnectionStateNew)

Expand Down Expand Up @@ -293,66 +295,54 @@ func (pc *PeerConnection) OnNegotiationNeeded(f func()) {

// onNegotiationNeeded enqueues negotiationNeededOp if necessary
// caller of this method should hold `pc.mu` lock
// https://www.w3.org/TR/webrtc/#dfn-update-the-negotiation-needed-flag
func (pc *PeerConnection) onNegotiationNeeded() {
// https://w3c.github.io/webrtc-pc/#updating-the-negotiation-needed-flag
// non-canon step 1
if pc.negotiationNeededState == negotiationNeededStateRun {
pc.negotiationNeededState = negotiationNeededStateQueue
return
} else if pc.negotiationNeededState == negotiationNeededStateQueue {
// 4.7.3.1 If the length of connection.[[Operations]] is not 0, then set
// connection.[[UpdateNegotiationNeededFlagOnEmptyChain]] to true, and abort these steps.
if !pc.ops.IsEmpty() {
pc.updateNegotiationNeededFlagOnEmptyChain.set(true)
return
}
pc.negotiationNeededState = negotiationNeededStateRun
pc.ops.Enqueue(pc.negotiationNeededOp)
}

// https://www.w3.org/TR/webrtc/#dfn-update-the-negotiation-needed-flag
func (pc *PeerConnection) negotiationNeededOp() {
// non-canon, reset needed state machine and run again if there was a request
defer func() {
pc.mu.Lock()
defer pc.mu.Unlock()
if pc.negotiationNeededState == negotiationNeededStateQueue {
defer pc.onNegotiationNeeded()
}
pc.negotiationNeededState = negotiationNeededStateEmpty
}()

// Don't run NegotiatedNeeded checks if OnNegotiationNeeded is not set
if handler, ok := pc.onNegotiationNeededHandler.Load().(func()); !ok || handler == nil {
return
}

// https://www.w3.org/TR/webrtc/#updating-the-negotiation-needed-flag
// Step 2.1
// 4.7.3.2.1 If connection.[[IsClosed]] is true, abort these steps.
if pc.isClosed.get() {
return
}
// non-canon step 2.2

// 4.7.3.2.2 If the length of connection.[[Operations]] is not 0,
// then set connection.[[UpdateNegotiationNeededFlagOnEmptyChain]] to
// true, and abort these steps.
if !pc.ops.IsEmpty() {
pc.ops.Enqueue(pc.negotiationNeededOp)
pc.updateNegotiationNeededFlagOnEmptyChain.set(true)
return
}

// Step 2.3
// 4.7.3.2.3 If connection's signaling state is not "stable", abort these steps.
if pc.SignalingState() != SignalingStateStable {
return
}

// Step 2.4
// 4.7.3.2.4 If the result of checking if negotiation is needed is false,
// clear the negotiation-needed flag by setting connection.[[NegotiationNeeded]]
// to false, and abort these steps.
if !pc.checkNegotiationNeeded() {
pc.isNegotiationNeeded.set(false)
return
}

// Step 2.5
// 4.7.3.2.5 If connection.[[NegotiationNeeded]] is already true, abort these steps.
if pc.isNegotiationNeeded.get() {
return
}

// Step 2.6
// 4.7.3.2.6 Set connection.[[NegotiationNeeded]] to true.
pc.isNegotiationNeeded.set(true)

// Step 2.7
// 4.7.3.2.7 Fire an event named negotiationneeded at connection.
if handler, ok := pc.onNegotiationNeededHandler.Load().(func()); ok && handler != nil {
handler()
}
Expand Down
38 changes: 0 additions & 38 deletions peerconnection_go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,41 +1623,3 @@ func TestPeerConnectionState(t *testing.T) {
assert.NoError(t, pc.Close())
assert.Equal(t, PeerConnectionStateClosed, pc.ConnectionState())
}

// See https://github.com/pion/webrtc/issues/2774
func TestNegotiationNeededAddedAfterOpQueueDone(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()

report := test.CheckRoutines(t)
defer report()

pc, err := NewPeerConnection(Configuration{})
if err != nil {
t.Error(err.Error())
}

var wg sync.WaitGroup
wg.Add(1)

_, err = pc.CreateDataChannel("initial_data_channel", nil)
assert.NoError(t, err)

// after there are no ops left in the queue, a previously faulty version
// of negotiationNeededOp would keep the negotiation needed state in
// negotiationNeededStateQueue which will cause all subsequent
// onNegotiationNeeded calls to never queue again, only if
// OnNegotiationNeeded has not been set yet.
for !pc.ops.IsEmpty() {
time.Sleep(time.Millisecond)
}

pc.OnNegotiationNeeded(wg.Done)

_, err = pc.CreateDataChannel("another_data_channel", nil)
assert.NoError(t, err)

wg.Wait()

assert.NoError(t, pc.Close())
}
11 changes: 0 additions & 11 deletions peerconnectionstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,3 @@ func (t PeerConnectionState) String() string {
return ErrUnknownType.Error()
}
}

type negotiationNeededState int

const (
// NegotiationNeededStateEmpty not running and queue is empty
negotiationNeededStateEmpty = iota
// NegotiationNeededStateEmpty running and queue is empty
negotiationNeededStateRun
// NegotiationNeededStateEmpty running and queue
negotiationNeededStateQueue
)

0 comments on commit b3856ff

Please sign in to comment.