Skip to content

Commit

Permalink
automatic cork/uncork with channel, make critical section smaller
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Jul 27, 2024
1 parent 63a6ecd commit 651dc4a
Showing 1 changed file with 88 additions and 65 deletions.
153 changes: 88 additions & 65 deletions sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (

// accept backlog
acceptBacklog = 128

// max latency for consecutive FEC encoding, in millisecond
maxFECEncodeLatency = 500
)

var (
Expand Down Expand Up @@ -116,7 +119,8 @@ type (
nonce Entropy

// packets waiting to be sent on wire
txqueue []ipv4.Message
chPostProcessing chan []byte

xconn batchConn // for x/net
xconnWriteError error

Expand Down Expand Up @@ -146,6 +150,7 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
sess.chWriteEvent = make(chan struct{}, 1)
sess.chSocketReadError = make(chan struct{})
sess.chSocketWriteError = make(chan struct{})
sess.chPostProcessing = make(chan []byte, acceptBacklog)
sess.remote = remote
sess.conn = conn
sess.ownConn = ownConn
Expand Down Expand Up @@ -184,11 +189,24 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
sess.kcp = NewKCP(conv, func(buf []byte, size int) {
// A basic check for the minimum packet size
if size >= IKCP_OVERHEAD+sess.headerSize {
sess.output(buf[:size])
// make a copy
bts := xmitBuf.Get().([]byte)[:size]
copy(bts, buf)

// delivery to post processing
select {
case sess.chPostProcessing <- bts:
case <-sess.die:
return
}

}
})
sess.kcp.ReserveBytes(sess.headerSize)

// create post-processing goroutine
go sess.postProcess()

if sess.l == nil { // it's a client connection
go sess.readLoop()
atomic.AddUint64(&DefaultSnmp.ActiveOpens, 1)
Expand Down Expand Up @@ -329,11 +347,10 @@ RESET_TIMER:

waitsnd = s.kcp.WaitSnd()
if waitsnd >= int(s.kcp.snd_wnd) || waitsnd >= int(s.kcp.rmt_wnd) || !s.writeDelay {
s.kcp.flush(false)
// uncork put the packets on wire immediately if the inflight window is full
// put the packets on wire immediately if the inflight window is full
// or if we've specified write no delay(NO merging of outgoing bytes)
// we don't have to wait until the periodical update() procedure uncorks.
s.uncork()
s.kcp.flush(false)
}
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
Expand All @@ -360,27 +377,6 @@ RESET_TIMER:
}
}

// uncork sends data in txqueue if there is any
// in kcp-go, the transmission pipeline is illustrated as follows:
//
// User sess.Write() => KCP ARQ output => FEC encoding => CRC32 integrity => Encryption => TxQueue
//
// the cork is to prevent the syscall function 'sendmmsg' from being sent immediately, which is a
// system call that sends multiple packets in one syscall. It's a trade-off between latency and throughput.
//
// Calling uncork will send all the pending packets in txqueue to the underlying connection immediately.
func (s *UDPSession) uncork() {
if len(s.txqueue) > 0 {
s.tx(s.txqueue)
// recycle
for k := range s.txqueue {
xmitBuf.Put(s.txqueue[k].Buffers[0])
s.txqueue[k].Buffers = nil
}
s.txqueue = s.txqueue[:0]
}
}

// Close closes the connection.
func (s *UDPSession) Close() error {
var once bool
Expand All @@ -395,7 +391,6 @@ func (s *UDPSession) Close() error {
// try best to send all queued messages especially the data in txqueue
s.mu.Lock()
s.kcp.flush(false)
s.uncork()

// release pending segments to recyle memory
s.kcp.ReleaseTX()
Expand Down Expand Up @@ -571,48 +566,79 @@ func (s *UDPSession) SetWriteBuffer(bytes int) error {
return errInvalidOperation
}

// a goroutine to handle post processing of kcp and make the critical section smaller
// pipeline for outgoing packets (from ARQ to network)
//
// KCP output -> FEC encoding -> CRC32 integrity -> Encryption -> TxQueue
func (s *UDPSession) output(buf []byte) {
var ecc [][]byte
func (s *UDPSession) postProcess() {
txqueue := make([]ipv4.Message, 0, acceptBacklog)
chCork := make(chan struct{}, 1)

// 1. FEC encoding
if s.fecEncoder != nil {
ecc = s.fecEncoder.encode(buf, s.kcp.rx_rto)
}
for {
select {
case buf := <-s.chPostProcessing: // dequeue from post processing
var ecc [][]byte

// 2&3. crc32 & encryption
if s.block != nil {
s.nonce.Fill(buf[:nonceSize])
checksum := crc32.ChecksumIEEE(buf[cryptHeaderSize:])
binary.LittleEndian.PutUint32(buf[nonceSize:], checksum)
s.block.Encrypt(buf, buf)

for k := range ecc {
s.nonce.Fill(ecc[k][:nonceSize])
checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
s.block.Encrypt(ecc[k], ecc[k])
}
}
// 1. FEC encoding
if s.fecEncoder != nil {
ecc = s.fecEncoder.encode(buf, maxFECEncodeLatency)
}

// 4. TxQueue
var msg ipv4.Message
for i := 0; i < s.dup+1; i++ {
bts := xmitBuf.Get().([]byte)[:len(buf)]
copy(bts, buf)
msg.Buffers = [][]byte{bts}
msg.Addr = s.remote
s.txqueue = append(s.txqueue, msg)
}
// 2&3. crc32 & encryption
if s.block != nil {
s.nonce.Fill(buf[:nonceSize])
checksum := crc32.ChecksumIEEE(buf[cryptHeaderSize:])
binary.LittleEndian.PutUint32(buf[nonceSize:], checksum)
s.block.Encrypt(buf, buf)

for k := range ecc {
s.nonce.Fill(ecc[k][:nonceSize])
checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
s.block.Encrypt(ecc[k], ecc[k])
}
}

// 4. TxQueue
var msg ipv4.Message
for i := 0; i < s.dup+1; i++ {
bts := xmitBuf.Get().([]byte)[:len(buf)]
copy(bts, buf)
msg.Buffers = [][]byte{bts}
msg.Addr = s.remote
txqueue = append(txqueue, msg)
}

for k := range ecc {
bts := xmitBuf.Get().([]byte)[:len(ecc[k])]
copy(bts, ecc[k])
msg.Buffers = [][]byte{bts}
msg.Addr = s.remote
txqueue = append(txqueue, msg)
}

for k := range ecc {
bts := xmitBuf.Get().([]byte)[:len(ecc[k])]
copy(bts, ecc[k])
msg.Buffers = [][]byte{bts}
msg.Addr = s.remote
s.txqueue = append(s.txqueue, msg)
// notify chCork only when chPostProcessing is empty
if len(s.chPostProcessing) == 0 {
select {
case chCork <- struct{}{}:
default:
}
}

case <-chCork: // emulate a corked socket
if len(txqueue) > 0 {
s.tx(txqueue)
// recycle
for k := range txqueue {
xmitBuf.Put(txqueue[k].Buffers[0])
txqueue[k].Buffers = nil
}
txqueue = txqueue[:0]
}

case <-s.die:
return
}
}
}

Expand All @@ -627,7 +653,6 @@ func (s *UDPSession) update() {
if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
s.notifyWriteEvent()
}
s.uncork()
s.mu.Unlock()
// self-synchronized timed scheduling
SystemTimedSched.Put(s.update, time.Now().Add(time.Duration(interval)*time.Millisecond))
Expand Down Expand Up @@ -767,7 +792,6 @@ func (s *UDPSession) kcpInput(data []byte) {
if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
s.notifyWriteEvent()
}
s.uncork()
s.mu.Unlock()
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
Expand All @@ -784,7 +808,6 @@ func (s *UDPSession) kcpInput(data []byte) {
if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
s.notifyWriteEvent()
}
s.uncork()
s.mu.Unlock()
}

Expand Down

0 comments on commit 651dc4a

Please sign in to comment.