Skip to content

Commit

Permalink
always send new segments before retransmission checks
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Feb 20, 2017
1 parent 2909e3c commit 6fe6030
Showing 1 changed file with 39 additions and 28 deletions.
67 changes: 39 additions & 28 deletions kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,51 +687,61 @@ func (kcp *KCP) flush() {
}

// sliding window, controlled by snd_nxt && sna_una+cwnd
count := 0
newSegsCount := 0
for k := range kcp.snd_queue {
if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
break
}
newseg := kcp.snd_queue[k]
newseg.conv = kcp.conv
newseg.cmd = IKCP_CMD_PUSH
newseg.wnd = seg.wnd
newseg.ts = current
newseg.sn = kcp.snd_nxt
newseg.una = kcp.rcv_nxt
newseg.resendts = newseg.ts
newseg.rto = kcp.rx_rto
kcp.snd_buf = append(kcp.snd_buf, newseg)
kcp.snd_nxt++
count++
newSegsCount++
kcp.snd_queue[k].data = nil
}
kcp.snd_queue = kcp.snd_queue[count:]

// flag pending data
hasPending := false
if count > 0 {
hasPending = true
}
kcp.snd_queue = kcp.snd_queue[newSegsCount:]

// calculate resent
resent := uint32(kcp.fastresend)
if kcp.fastresend <= 0 {
resent = 0xffffffff
}

// flush data segments
// counters
var lostSegs, fastRetransSegs, earlyRetransSegs uint64
for k := range kcp.snd_buf {

// send new segments
for k := len(kcp.snd_buf) - newSegsCount; k < len(kcp.snd_buf); k++ {
current := currentMs()
segment := &kcp.snd_buf[k]
segment.xmit++
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
segment.ts = current
segment.wnd = seg.wnd
segment.una = kcp.rcv_nxt

size := len(buffer) - len(ptr)
need := IKCP_OVERHEAD + len(segment.data)

if size+need > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}

ptr = segment.encode(ptr)
copy(ptr, segment.data)
ptr = ptr[len(segment.data):]
}

// check for retransmissions
for k := 0; k < len(kcp.snd_buf)-newSegsCount; k++ {
current := currentMs()
segment := &kcp.snd_buf[k]
needsend := false
if segment.xmit == 0 {
needsend = true
segment.xmit++
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
} else if _itimediff(current, segment.resendts) >= 0 {
if _itimediff(current, segment.resendts) >= 0 { // RTO
needsend = true
segment.xmit++
kcp.xmit++
Expand All @@ -753,7 +763,7 @@ func (kcp *KCP) flush() {
change++
fastRetransSegs++
}
} else if segment.fastack > 0 && !hasPending { // early retransmit
} else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
lastsend := segment.resendts - segment.rto
if _itimediff(current, lastsend) >= kcp.rx_srtt>>2 {
needsend = true
Expand Down Expand Up @@ -788,17 +798,18 @@ func (kcp *KCP) flush() {
}
}

atomic.AddUint64(&DefaultSnmp.RetransSegs, lostSegs+fastRetransSegs+earlyRetransSegs)
atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)

// flash remain segments
size := len(buffer) - len(ptr)
if size > 0 {
kcp.output(buffer, size)
}

// counter updates
atomic.AddUint64(&DefaultSnmp.RetransSegs, lostSegs+fastRetransSegs+earlyRetransSegs)
atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)

// update ssthresh
// rate halving, https://tools.ietf.org/html/rfc6937
if change != 0 {
Expand Down

0 comments on commit 6fe6030

Please sign in to comment.