diff --git a/operations.go b/operations.go index d9dca4a8e48..bc366ac34db 100644 --- a/operations.go +++ b/operations.go @@ -16,11 +16,19 @@ type operations struct { mu sync.Mutex busy bool ops *list.List + + updateNegotiationNeededFlagOnEmptyChain *atomicBool + onNegotiationNeeded func() } -func newOperations() *operations { +func newOperations( + updateNegotiationNeededFlagOnEmptyChain *atomicBool, + onNegotiationNeeded func(), +) *operations { return &operations{ - ops: list.New(), + ops: list.New(), + updateNegotiationNeededFlagOnEmptyChain: updateNegotiationNeededFlagOnEmptyChain, + onNegotiationNeeded: onNegotiationNeeded, } } @@ -93,4 +101,9 @@ func (o *operations) start() { fn() fn = o.pop() } + if !o.updateNegotiationNeededFlagOnEmptyChain.get() { + return + } + o.updateNegotiationNeededFlagOnEmptyChain.set(false) + o.onNegotiationNeeded() } diff --git a/operations_test.go b/operations_test.go index f426549f6c5..428c2b4df97 100644 --- a/operations_test.go +++ b/operations_test.go @@ -4,19 +4,31 @@ package webrtc import ( + "sync" "testing" "github.com/stretchr/testify/assert" ) func TestOperations_Enqueue(t *testing.T) { - ops := newOperations() - for i := 0; i < 100; i++ { + updateNegotiationNeededFlagOnEmptyChain := &atomicBool{} + onNegotiationNeededCalledCount := 0 + var onNegotiationNeededCalledCountMu sync.Mutex + ops := newOperations(updateNegotiationNeededFlagOnEmptyChain, func() { + onNegotiationNeededCalledCountMu.Lock() + onNegotiationNeededCalledCount++ + onNegotiationNeededCalledCountMu.Unlock() + }) + for resultSet := 0; resultSet < 100; resultSet++ { results := make([]int, 16) + resultSetCopy := resultSet for i := range results { func(j int) { ops.Enqueue(func() { results[j] = j * j + if resultSetCopy > 50 { + updateNegotiationNeededFlagOnEmptyChain.set(true) + } }) }(i) } @@ -26,9 +38,13 @@ func TestOperations_Enqueue(t *testing.T) { assert.Equal(t, len(expected), len(results)) assert.Equal(t, expected, results) } + onNegotiationNeededCalledCountMu.Lock() + defer onNegotiationNeededCalledCountMu.Unlock() + assert.NotEqual(t, onNegotiationNeededCalledCount, 0) } func TestOperations_Done(*testing.T) { - ops := newOperations() + ops := newOperations(&atomicBool{}, func() { + }) ops.Done() } diff --git a/peerconnection.go b/peerconnection.go index 36e81a2b765..92977b5f763 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -55,9 +55,9 @@ type PeerConnection struct { idpLoginURL *string - isClosed *atomicBool - isNegotiationNeeded *atomicBool - negotiationNeededState negotiationNeededState + isClosed *atomicBool + isNegotiationNeeded *atomicBool + updateNegotiationNeededFlagOnEmptyChain *atomicBool lastOffer string lastAnswer string @@ -115,6 +115,7 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection, // https://w3c.github.io/webrtc-pc/#constructor (Step #2) // Some variables defined explicitly despite their implicit zero values to // allow better readability to understand what is happening. + pc := &PeerConnection{ statsID: fmt.Sprintf("PeerConnection-%d", time.Now().UnixNano()), configuration: Configuration{ @@ -125,18 +126,19 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection, Certificates: []Certificate{}, ICECandidatePoolSize: 0, }, - ops: newOperations(), - isClosed: &atomicBool{}, - isNegotiationNeeded: &atomicBool{}, - negotiationNeededState: negotiationNeededStateEmpty, - lastOffer: "", - lastAnswer: "", - greaterMid: -1, - signalingState: SignalingStateStable, + isClosed: &atomicBool{}, + isNegotiationNeeded: &atomicBool{}, + updateNegotiationNeededFlagOnEmptyChain: &atomicBool{}, + lastOffer: "", + lastAnswer: "", + greaterMid: -1, + signalingState: SignalingStateStable, api: api, log: api.settingEngine.LoggerFactory.NewLogger("pc"), } + pc.ops = newOperations(pc.updateNegotiationNeededFlagOnEmptyChain, pc.onNegotiationNeeded) + pc.iceConnectionState.Store(ICEConnectionStateNew) pc.connectionState.Store(PeerConnectionStateNew) @@ -293,66 +295,54 @@ func (pc *PeerConnection) OnNegotiationNeeded(f func()) { // onNegotiationNeeded enqueues negotiationNeededOp if necessary // caller of this method should hold `pc.mu` lock +// https://www.w3.org/TR/webrtc/#dfn-update-the-negotiation-needed-flag func (pc *PeerConnection) onNegotiationNeeded() { - // https://w3c.github.io/webrtc-pc/#updating-the-negotiation-needed-flag - // non-canon step 1 - if pc.negotiationNeededState == negotiationNeededStateRun { - pc.negotiationNeededState = negotiationNeededStateQueue - return - } else if pc.negotiationNeededState == negotiationNeededStateQueue { + // 4.7.3.1 If the length of connection.[[Operations]] is not 0, then set + // connection.[[UpdateNegotiationNeededFlagOnEmptyChain]] to true, and abort these steps. + if !pc.ops.IsEmpty() { + pc.updateNegotiationNeededFlagOnEmptyChain.set(true) return } - pc.negotiationNeededState = negotiationNeededStateRun pc.ops.Enqueue(pc.negotiationNeededOp) } +// https://www.w3.org/TR/webrtc/#dfn-update-the-negotiation-needed-flag func (pc *PeerConnection) negotiationNeededOp() { - // non-canon, reset needed state machine and run again if there was a request - defer func() { - pc.mu.Lock() - defer pc.mu.Unlock() - if pc.negotiationNeededState == negotiationNeededStateQueue { - defer pc.onNegotiationNeeded() - } - pc.negotiationNeededState = negotiationNeededStateEmpty - }() - - // Don't run NegotiatedNeeded checks if OnNegotiationNeeded is not set - if handler, ok := pc.onNegotiationNeededHandler.Load().(func()); !ok || handler == nil { - return - } - - // https://www.w3.org/TR/webrtc/#updating-the-negotiation-needed-flag - // Step 2.1 + // 4.7.3.2.1 If connection.[[IsClosed]] is true, abort these steps. if pc.isClosed.get() { return } - // non-canon step 2.2 + + // 4.7.3.2.2 If the length of connection.[[Operations]] is not 0, + // then set connection.[[UpdateNegotiationNeededFlagOnEmptyChain]] to + // true, and abort these steps. if !pc.ops.IsEmpty() { - pc.ops.Enqueue(pc.negotiationNeededOp) + pc.updateNegotiationNeededFlagOnEmptyChain.set(true) return } - // Step 2.3 + // 4.7.3.2.3 If connection's signaling state is not "stable", abort these steps. if pc.SignalingState() != SignalingStateStable { return } - // Step 2.4 + // 4.7.3.2.4 If the result of checking if negotiation is needed is false, + // clear the negotiation-needed flag by setting connection.[[NegotiationNeeded]] + // to false, and abort these steps. if !pc.checkNegotiationNeeded() { pc.isNegotiationNeeded.set(false) return } - // Step 2.5 + // 4.7.3.2.5 If connection.[[NegotiationNeeded]] is already true, abort these steps. if pc.isNegotiationNeeded.get() { return } - // Step 2.6 + // 4.7.3.2.6 Set connection.[[NegotiationNeeded]] to true. pc.isNegotiationNeeded.set(true) - // Step 2.7 + // 4.7.3.2.7 Fire an event named negotiationneeded at connection. if handler, ok := pc.onNegotiationNeededHandler.Load().(func()); ok && handler != nil { handler() } diff --git a/peerconnection_go_test.go b/peerconnection_go_test.go index 15cce5edb65..b51a0c65057 100644 --- a/peerconnection_go_test.go +++ b/peerconnection_go_test.go @@ -1623,41 +1623,3 @@ func TestPeerConnectionState(t *testing.T) { assert.NoError(t, pc.Close()) assert.Equal(t, PeerConnectionStateClosed, pc.ConnectionState()) } - -// See https://github.com/pion/webrtc/issues/2774 -func TestNegotiationNeededAddedAfterOpQueueDone(t *testing.T) { - lim := test.TimeOut(time.Second * 30) - defer lim.Stop() - - report := test.CheckRoutines(t) - defer report() - - pc, err := NewPeerConnection(Configuration{}) - if err != nil { - t.Error(err.Error()) - } - - var wg sync.WaitGroup - wg.Add(1) - - _, err = pc.CreateDataChannel("initial_data_channel", nil) - assert.NoError(t, err) - - // after there are no ops left in the queue, a previously faulty version - // of negotiationNeededOp would keep the negotiation needed state in - // negotiationNeededStateQueue which will cause all subsequent - // onNegotiationNeeded calls to never queue again, only if - // OnNegotiationNeeded has not been set yet. - for !pc.ops.IsEmpty() { - time.Sleep(time.Millisecond) - } - - pc.OnNegotiationNeeded(wg.Done) - - _, err = pc.CreateDataChannel("another_data_channel", nil) - assert.NoError(t, err) - - wg.Wait() - - assert.NoError(t, pc.Close()) -} diff --git a/peerconnectionstate.go b/peerconnectionstate.go index 1f1456882d8..b626a31e32d 100644 --- a/peerconnectionstate.go +++ b/peerconnectionstate.go @@ -84,14 +84,3 @@ func (t PeerConnectionState) String() string { return ErrUnknownType.Error() } } - -type negotiationNeededState int - -const ( - // NegotiationNeededStateEmpty not running and queue is empty - negotiationNeededStateEmpty = iota - // NegotiationNeededStateEmpty running and queue is empty - negotiationNeededStateRun - // NegotiationNeededStateEmpty running and queue - negotiationNeededStateQueue -)