Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of retransmitted cookie-echo #130

Merged
merged 5 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,29 +982,34 @@ func (a *Association) handleHeartbeat(c *chunkHeartbeat) []*packet {
func (a *Association) handleCookieEcho(c *chunkCookieEcho) []*packet {
state := a.getState()
a.log.Debugf("[%s] COOKIE-ECHO received in state '%s'", a.name, getAssociationStateString(state))
if state != closed && state != cookieWait && state != cookieEchoed {
switch state {
default:
return nil
}
case established:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current implementation seems returning cookie-ack whenever cookie-echo is received after established.
https://tools.ietf.org/html/rfc4960#section-5.2.4 seems to specify several conditions/actions and cookie-echo should be silently discarded in some case.

Copy link
Member Author

@enobufs enobufs Jul 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@at-wat Yes, good finding. The current implementation does not follow the RFC for building a cookie. It is currently a random value that prevents us from validating the echoed cookie as specified in the RFC.

I am hoping to address this issue in #74 sometime soon, encoding TCB information into the cookie, then validate as the spec says. (no effect in WebRTC usage however, if we provide Listen/Dial semantics, yes it is desired.)

This pull-request tries to address missing state transitions from COOKIE-ECHOED to ESTABLISHED because the peer in the ESTABLISHED state does not send COOKIE-ACK, a different issue. We are seeing the connection stalled in the real world because of this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am hoping to address this issue in #74 sometime soon, encoding TCB information into the cookie, then validate as the spec says.

Great!
Maybe it's better to notify the current status of the implementation in README. If users try to use the library as a full sctp implementation, simplified implementation may cause some security problems. (as it's over DTLS in WebRTC, we don't have to care.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @at-wat You just made me realize we at least needed to check if the echoed cookie was identical to the original one. Thanks!

Let me add these lines in the "established" case.

		if !bytes.Equal(a.myCookie.cookie, c.cookie) {
			return nil
		}

When the cookie does not match, it will silently be discarded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@at-wat a quick question. Use used 'break' inside the switch statement. Golang does not require break but is that something you'd recommend for readability or any other reasons? (just out of my curiosity...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong reason, but I just feel it's more clear to tell that it's not fallthrough. (I mainly used C/C++ before starting Go)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@at-wat I have just added the cookie comparison to the established case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hear you. I write C/C++ too. ;) but I am trying to think in Go. I removed it in the last commit, hope you wouldn't mind...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's 100% fine to me!

if !bytes.Equal(a.myCookie.cookie, c.cookie) {
return nil
}
case closed, cookieWait, cookieEchoed:
if !bytes.Equal(a.myCookie.cookie, c.cookie) {
return nil
}

if !bytes.Equal(a.myCookie.cookie, c.cookie) {
return nil
}
a.t1Init.stop()
a.storedInit = nil

a.t1Init.stop()
a.storedInit = nil
a.t1Cookie.stop()
a.storedCookieEcho = nil

a.t1Cookie.stop()
a.storedCookieEcho = nil
a.setState(established)
a.handshakeCompletedCh <- nil
}

p := &packet{
verificationTag: a.peerVerificationTag,
sourcePort: a.sourcePort,
destinationPort: a.destinationPort,
chunks: []chunk{&chunkCookieAck{}},
}

a.setState(established)
a.handshakeCompletedCh <- nil
return pack(p)
}

Expand Down
78 changes: 0 additions & 78 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2022,84 +2022,6 @@ func TestAssocDelayedAck(t *testing.T) {

closeAssociationPair(br, a0, a1)
})

t.Run("Second DATA chunk to generate SACK immedidately", func(t *testing.T) {
lim := test.TimeOut(time.Second * 10)
defer lim.Stop()

const si uint16 = 6
var n int
var nPacketsReceived int
var ppi PayloadProtocolIdentifier
sbuf := make([]byte, 4000) // size should be less than initial cwnd (4380)
rbuf := make([]byte, 4000)

_, err := cryptoRand.Read(sbuf)
if !assert.Nil(t, err, "failed to create associations") {
return
}

br := test.NewBridge()

a0, a1, err := createNewAssociationPair(br, ackModeAlwaysDelay, 0)
if !assert.Nil(t, err, "failed to create associations") {
assert.FailNow(t, "failed due to earlier error")
}

s0, s1, err := establishSessionPair(br, a0, a1, si)
assert.Nil(t, err, "failed to establish session pair")

a0.stats.reset()
a1.stats.reset()

// Writes data (will fragmented)
n, err = s0.WriteSCTP(sbuf, PayloadTypeWebRTCBinary)
assert.Nil(t, err, "WriteSCTP failed")
assert.Equal(t, n, len(sbuf), "unexpected length of received data")

// Repeat calling br.Tick() until the buffered amount becomes 0
for s0.BufferedAmount() > 0 {
for {
n = br.Tick()
if n == 0 {
break
}
}

for {
s1.lock.RLock()
readable := s1.reassemblyQueue.isReadable()
s1.lock.RUnlock()
if !readable {
break
}
n, ppi, err = s1.ReadSCTP(rbuf)
if !assert.Nil(t, err, "ReadSCTP failed") {
return
}
assert.Equal(t, len(sbuf), n, "unexpected length of received data")
assert.Equal(t, ppi, PayloadTypeWebRTCBinary, "unexpected ppi")

nPacketsReceived++
}
}

br.Process()

assert.Equal(t, 1, nPacketsReceived, "should be one packet received")
assert.Equal(t, 0, s1.getNumBytesInReassemblyQueue(), "reassembly queue should be empty")

t.Logf("nDATAs : %d\n", a1.stats.getNumDATAs())
t.Logf("nSACKs : %d\n", a0.stats.getNumSACKs())
t.Logf("nAckTimeouts: %d\n", a1.stats.getNumAckTimeouts())

assert.Equal(t, uint64(4), a1.stats.getNumDATAs(), "DATA chunk count mismatch")
assert.True(t, a0.stats.getNumSACKs() < a1.stats.getNumDATAs(), "sack count should less than data")
assert.Equal(t, uint64(0), a1.stats.getNumAckTimeouts(), "ackTimeout count mismatch")
assert.Equal(t, uint64(0), a0.stats.getNumT3Timeouts(), "should be no retransmit")

closeAssociationPair(br, a0, a1)
})
}

func TestAssocReset(t *testing.T) {
Expand Down
157 changes: 147 additions & 10 deletions vnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ type vNetEnvConfig struct {
}

type vNetEnv struct {
wan *vnet.Router
net0 *vnet.Net
net1 *vnet.Net
numToDropData int
numToDropReconfig int
wan *vnet.Router
net0 *vnet.Net
net1 *vnet.Net
numToDropData int
numToDropReconfig int
numToDropCookieEcho int
numToDropCookieAck int
}

func (venv *vNetEnv) dropNextDataChunk(numToDrop int) {
Expand All @@ -36,6 +38,14 @@ func (venv *vNetEnv) dropNextReconfigChunk(numToDrop int) {
venv.numToDropReconfig = numToDrop
}

func (venv *vNetEnv) dropNextCookieEchoChunk(numToDrop int) {
venv.numToDropCookieEcho = numToDrop
}

func (venv *vNetEnv) dropNextCookieAckChunk(numToDrop int) {
venv.numToDropCookieAck = numToDrop
}

func buildVNetEnv(cfg *vNetEnvConfig) (*vNetEnv, error) {
log := cfg.log

Expand Down Expand Up @@ -88,6 +98,20 @@ func buildVNetEnv(cfg *vNetEnvConfig) (*vNetEnv, error) {
log.Infof("Chunk filter: drop RECONFIG %s", chunk.String())
break loop
}
case *chunkCookieEcho:
if venv.numToDropCookieEcho > 0 {
toDrop = true
venv.numToDropCookieEcho--
log.Infof("Chunk filter: drop %s", chunk.String())
break loop
}
case *chunkCookieAck:
if venv.numToDropCookieAck > 0 {
toDrop = true
venv.numToDropCookieAck--
log.Infof("Chunk filter: drop %s", chunk.String())
break loop
}
}
}
return !toDrop
Expand Down Expand Up @@ -183,7 +207,7 @@ func testRwndFull(t *testing.T, unordered bool) {
}
defer assoc.Close() // nolint:errcheck

log.Info("server handlshake complete")
log.Info("server handshake complete")
close(serverHandshakeDone)

stream, err := assoc.AcceptStream()
Expand Down Expand Up @@ -257,7 +281,7 @@ func testRwndFull(t *testing.T, unordered bool) {
}
defer assoc.Close() // nolint:errcheck

log.Info("client handlshake complete")
log.Info("client handshake complete")
close(clientHandshakeDone)

stream, err := assoc.OpenStream(777, PayloadTypeWebRTCBinary)
Expand Down Expand Up @@ -360,7 +384,7 @@ func TestRwndFull(t *testing.T) {
}

func testStreamClose(t *testing.T, dropReconfig bool) {
lim := test.TimeOut(time.Second * 5)
lim := test.TimeOut(time.Second * 10)
defer lim.Stop()

loggerFactory := logging.NewDefaultLoggerFactory()
Expand Down Expand Up @@ -408,7 +432,7 @@ func testStreamClose(t *testing.T, dropReconfig bool) {
}
defer assoc.Close() // nolint:errcheck

log.Info("server handlshake complete")
log.Info("server handshake complete")

stream, err := assoc.AcceptStream()
if !assert.NoError(t, err, "should succeed") {
Expand Down Expand Up @@ -457,7 +481,7 @@ func testStreamClose(t *testing.T, dropReconfig bool) {
}
defer assoc.Close() // nolint:errcheck

log.Info("client handlshake complete")
log.Info("client handshake complete")

stream, err := assoc.OpenStream(777, PayloadTypeWebRTCBinary)
if !assert.NoError(t, err, "should succeed") {
Expand Down Expand Up @@ -544,3 +568,116 @@ func TestStreamClose(t *testing.T) {
testStreamClose(t, true)
})
}

// this test case reproduces the issue mentioned in
// https://github.com/pion/webrtc/issues/1270#issuecomment-653953743
// and confirmes the fix.
// To reproduce the case mentioned above:
// * Use simultaneous-open (SCTP)
// * Drop both of the first COOKIE-ECHO and COOKIE-ACK
func TestCookieEchoRetransmission(t *testing.T) {
lim := test.TimeOut(time.Second * 10)
defer lim.Stop()

loggerFactory := logging.NewDefaultLoggerFactory()
log := loggerFactory.NewLogger("test")

venv, err := buildVNetEnv(&vNetEnvConfig{
minDelay: 200 * time.Millisecond,
loggerFactory: loggerFactory,
log: log,
})
if !assert.NoError(t, err, "should succeed") {
return
}
if !assert.NotNil(t, venv, "should not be nil") {
return
}
defer venv.wan.Stop() // nolint:errcheck

// To cause the cookie echo retransmission, both COOKIE-ECHO
// and COOKIE-ACK chunks need to be dropped at the same time.
venv.dropNextCookieEchoChunk(1)
venv.dropNextCookieAckChunk(1)

serverHandshakeDone := make(chan struct{})
clientHandshakeDone := make(chan struct{})
waitAllHandshakeDone := make(chan struct{})
clientShutDown := make(chan struct{})
serverShutDown := make(chan struct{})

maxReceiveBufferSize := uint32(64 * 1024)

// Go routine for Server
go func() {
defer close(serverShutDown)
// connected UDP conn for server
conn, err := venv.net0.DialUDP("udp4",
&net.UDPAddr{IP: net.ParseIP("1.1.1.1"), Port: 5000},
&net.UDPAddr{IP: net.ParseIP("2.2.2.2"), Port: 5000},
)
if !assert.NoError(t, err, "should succeed") {
return
}
defer conn.Close() // nolint:errcheck

// server association
// using Client for simultaneous open
assoc, err := Client(Config{
NetConn: conn,
MaxReceiveBufferSize: maxReceiveBufferSize,
LoggerFactory: loggerFactory,
})
if !assert.NoError(t, err, "should succeed") {
return
}
defer assoc.Close() // nolint:errcheck

log.Info("server handshake complete")
close(serverHandshakeDone)
<-waitAllHandshakeDone
}()

// Go routine for Client
go func() {
defer close(clientShutDown)
// connected UDP conn for client
conn, err := venv.net1.DialUDP("udp4",
&net.UDPAddr{IP: net.ParseIP("2.2.2.2"), Port: 5000},
&net.UDPAddr{IP: net.ParseIP("1.1.1.1"), Port: 5000},
)
if !assert.NoError(t, err, "should succeed") {
return
}

// client association
assoc, err := Client(Config{
NetConn: conn,
MaxReceiveBufferSize: maxReceiveBufferSize,
LoggerFactory: loggerFactory,
})
if !assert.NoError(t, err, "should succeed") {
return
}
defer assoc.Close() // nolint:errcheck

log.Info("client handshake complete")
close(clientHandshakeDone)
<-waitAllHandshakeDone
}()

//
// Scenario
//

// wait until both handshake complete
<-clientHandshakeDone
<-serverHandshakeDone
close(waitAllHandshakeDone)

log.Info("handshake complete")

<-clientShutDown
<-serverShutDown
log.Info("all done")
}