diff --git a/kcp.go b/kcp.go index d595bbe3..c14d4f80 100644 --- a/kcp.go +++ b/kcp.go @@ -687,7 +687,7 @@ func (kcp *KCP) flush() { } // sliding window, controlled by snd_nxt && sna_una+cwnd - count := 0 + newSegsCount := 0 for k := range kcp.snd_queue { if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 { break @@ -695,24 +695,13 @@ func (kcp *KCP) flush() { newseg := kcp.snd_queue[k] newseg.conv = kcp.conv newseg.cmd = IKCP_CMD_PUSH - newseg.wnd = seg.wnd - newseg.ts = current newseg.sn = kcp.snd_nxt - newseg.una = kcp.rcv_nxt - newseg.resendts = newseg.ts - newseg.rto = kcp.rx_rto kcp.snd_buf = append(kcp.snd_buf, newseg) kcp.snd_nxt++ - count++ + newSegsCount++ kcp.snd_queue[k].data = nil } - kcp.snd_queue = kcp.snd_queue[count:] - - // flag pending data - hasPending := false - if count > 0 { - hasPending = true - } + kcp.snd_queue = kcp.snd_queue[newSegsCount:] // calculate resent resent := uint32(kcp.fastresend) @@ -720,18 +709,39 @@ func (kcp *KCP) flush() { resent = 0xffffffff } - // flush data segments + // counters var lostSegs, fastRetransSegs, earlyRetransSegs uint64 - for k := range kcp.snd_buf { + + // send new segments + for k := len(kcp.snd_buf) - newSegsCount; k < len(kcp.snd_buf); k++ { + current := currentMs() + segment := &kcp.snd_buf[k] + segment.xmit++ + segment.rto = kcp.rx_rto + segment.resendts = current + segment.rto + segment.ts = current + segment.wnd = seg.wnd + segment.una = kcp.rcv_nxt + + size := len(buffer) - len(ptr) + need := IKCP_OVERHEAD + len(segment.data) + + if size+need > int(kcp.mtu) { + kcp.output(buffer, size) + ptr = buffer + } + + ptr = segment.encode(ptr) + copy(ptr, segment.data) + ptr = ptr[len(segment.data):] + } + + // check for retransmissions + for k := 0; k < len(kcp.snd_buf)-newSegsCount; k++ { current := currentMs() segment := &kcp.snd_buf[k] needsend := false - if segment.xmit == 0 { - needsend = true - segment.xmit++ - segment.rto = kcp.rx_rto - segment.resendts = current + segment.rto - } else if _itimediff(current, segment.resendts) >= 0 { + if _itimediff(current, segment.resendts) >= 0 { // RTO needsend = true segment.xmit++ kcp.xmit++ @@ -753,7 +763,7 @@ func (kcp *KCP) flush() { change++ fastRetransSegs++ } - } else if segment.fastack > 0 && !hasPending { // early retransmit + } else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit lastsend := segment.resendts - segment.rto if _itimediff(current, lastsend) >= kcp.rx_srtt>>2 { needsend = true @@ -788,17 +798,18 @@ func (kcp *KCP) flush() { } } - atomic.AddUint64(&DefaultSnmp.RetransSegs, lostSegs+fastRetransSegs+earlyRetransSegs) - atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs) - atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs) - atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs) - // flash remain segments size := len(buffer) - len(ptr) if size > 0 { kcp.output(buffer, size) } + // counter updates + atomic.AddUint64(&DefaultSnmp.RetransSegs, lostSegs+fastRetransSegs+earlyRetransSegs) + atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs) + atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs) + atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs) + // update ssthresh // rate halving, https://tools.ietf.org/html/rfc6937 if change != 0 {