Skip to content

Commit

Permalink
lock only once for kcpInput()
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Jan 16, 2017
1 parent fa77ab6 commit 0941a27
Showing 1 changed file with 35 additions and 23 deletions.
58 changes: 35 additions & 23 deletions sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,18 +546,25 @@ func (s *UDPSession) notifyWriteEvent() {
}

func (s *UDPSession) kcpInput(data []byte) {
kcpInErrors := uint64(0)
fecErrs := uint64(0)
fecRecovered := uint64(0)

if s.fec != nil {
f := s.fec.decode(data)
s.mu.Lock()
if f.flag == typeData {
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true); ret != 0 {
atomic.AddUint64(&DefaultSnmp.KCPInErrors, 1)
}
}

if f.flag == typeData || f.flag == typeFEC {
if f.flag == typeFEC {
atomic.AddUint64(&DefaultSnmp.FECSegs, 1)
}

if recovers := s.fec.input(f); recovers != nil {
s.mu.Lock()
kcpInErrors := uint64(0)
fecErrs := uint64(0)
fecRecovered := uint64(0)
for _, r := range recovers {
if len(r) >= 2 { // must be larger than 2bytes
sz := binary.LittleEndian.Uint16(r)
Expand All @@ -574,38 +581,43 @@ func (s *UDPSession) kcpInput(data []byte) {
fecErrs++
}
}
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
}
}
if f.flag == typeData {
s.mu.Lock()
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true); ret != 0 {
atomic.AddUint64(&DefaultSnmp.KCPInErrors, 1)
}
s.mu.Unlock()

// notify reader
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
if s.ackNoDelay {
s.kcp.flush()
}
s.mu.Unlock()
} else {
s.mu.Lock()
if ret := s.kcp.Input(data, true); ret != 0 {
atomic.AddUint64(&DefaultSnmp.KCPInErrors, 1)
}
// notify reader
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
if s.ackNoDelay {
s.kcp.flush()
}
s.mu.Unlock()
}

// notify reader
s.mu.Lock()
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
if s.ackNoDelay {
s.kcp.flush()
}
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.InSegs, 1)
atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
if kcpInErrors > 0 {
atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
}
if fecErrs > 0 {
atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
}
if fecRecovered > 0 {
atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
}
}

func (s *UDPSession) receiver(ch chan []byte) {
Expand Down

0 comments on commit 0941a27

Please sign in to comment.