Skip to content

Commit 80f1038

Browse files
committed
allocator optimization: avoid tmp obj's allocation and gc
1 parent d7000b4 commit 80f1038

File tree

4 files changed

+78
-53
lines changed

4 files changed

+78
-53
lines changed

alloc.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,34 +50,41 @@ func NewAllocator() *Allocator {
5050
for k := range alloc.buffers {
5151
i := k
5252
alloc.buffers[k].New = func() interface{} {
53-
return make([]byte, 1<<uint32(i))
53+
b := make([]byte, 1<<uint32(i))
54+
return &b
5455
}
5556
}
5657
return alloc
5758
}
5859

5960
// Get a []byte from pool with most appropriate cap
60-
func (alloc *Allocator) Get(size int) []byte {
61+
func (alloc *Allocator) Get(size int) *[]byte {
6162
if size <= 0 || size > 65536 {
6263
return nil
6364
}
6465

6566
bits := msb(size)
6667
if size == 1<<bits {
67-
return alloc.buffers[bits].Get().([]byte)[:size]
68-
} else {
69-
return alloc.buffers[bits+1].Get().([]byte)[:size]
68+
p := alloc.buffers[bits].Get().(*[]byte)
69+
*p = (*p)[:size]
70+
return p
7071
}
72+
p := alloc.buffers[bits+1].Get().(*[]byte)
73+
*p = (*p)[:size]
74+
return p
7175
}
7276

7377
// Put returns a []byte to pool for future use,
7478
// which the cap must be exactly 2^n
75-
func (alloc *Allocator) Put(buf []byte) error {
76-
bits := msb(cap(buf))
77-
if cap(buf) == 0 || cap(buf) > 65536 || cap(buf) != 1<<bits {
79+
func (alloc *Allocator) Put(p *[]byte) error {
80+
if p == nil {
7881
return errors.New("allocator Put() incorrect buffer size")
7982
}
80-
alloc.buffers[bits].Put(buf)
83+
bits := msb(cap(*p))
84+
if cap(*p) == 0 || cap(*p) > 65536 || cap(*p) != 1<<bits {
85+
return errors.New("allocator Put() incorrect buffer size")
86+
}
87+
alloc.buffers[bits].Put(p)
8188
return nil
8289
}
8390

alloc_test.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,25 @@ func TestAllocGet(t *testing.T) {
3232
if alloc.Get(0) != nil {
3333
t.Fatal(0)
3434
}
35-
if len(alloc.Get(1)) != 1 {
35+
if len(*alloc.Get(1)) != 1 {
3636
t.Fatal(1)
3737
}
38-
if len(alloc.Get(2)) != 2 {
38+
if len(*alloc.Get(2)) != 2 {
3939
t.Fatal(2)
4040
}
41-
if len(alloc.Get(3)) != 3 || cap(alloc.Get(3)) != 4 {
41+
if len(*alloc.Get(3)) != 3 || cap(*alloc.Get(3)) != 4 {
4242
t.Fatal(3)
4343
}
44-
if len(alloc.Get(4)) != 4 {
44+
if len(*alloc.Get(4)) != 4 {
4545
t.Fatal(4)
4646
}
47-
if len(alloc.Get(1023)) != 1023 || cap(alloc.Get(1023)) != 1024 {
47+
if len(*alloc.Get(1023)) != 1023 || cap(*alloc.Get(1023)) != 1024 {
4848
t.Fatal(1023)
4949
}
50-
if len(alloc.Get(1024)) != 1024 {
50+
if len(*alloc.Get(1024)) != 1024 {
5151
t.Fatal(1024)
5252
}
53-
if len(alloc.Get(65536)) != 65536 {
53+
if len(*alloc.Get(65536)) != 65536 {
5454
t.Fatal(65536)
5555
}
5656
if alloc.Get(65537) != nil {
@@ -63,19 +63,24 @@ func TestAllocPut(t *testing.T) {
6363
if err := alloc.Put(nil); err == nil {
6464
t.Fatal("put nil misbehavior")
6565
}
66-
if err := alloc.Put(make([]byte, 3, 3)); err == nil {
66+
b := make([]byte, 3)
67+
if err := alloc.Put(&b); err == nil {
6768
t.Fatal("put elem:3 []bytes misbehavior")
6869
}
69-
if err := alloc.Put(make([]byte, 4, 4)); err != nil {
70+
b = make([]byte, 4)
71+
if err := alloc.Put(&b); err != nil {
7072
t.Fatal("put elem:4 []bytes misbehavior")
7173
}
72-
if err := alloc.Put(make([]byte, 1023, 1024)); err != nil {
74+
b = make([]byte, 1023, 1024)
75+
if err := alloc.Put(&b); err != nil {
7376
t.Fatal("put elem:1024 []bytes misbehavior")
7477
}
75-
if err := alloc.Put(make([]byte, 65536, 65536)); err != nil {
78+
b = make([]byte, 65536)
79+
if err := alloc.Put(&b); err != nil {
7680
t.Fatal("put elem:65536 []bytes misbehavior")
7781
}
78-
if err := alloc.Put(make([]byte, 65537, 65537)); err == nil {
82+
b = make([]byte, 65537)
83+
if err := alloc.Put(&b); err == nil {
7984
t.Fatal("put elem:65537 []bytes misbehavior")
8085
}
8186
}
@@ -85,7 +90,7 @@ func TestAllocPutThenGet(t *testing.T) {
8590
data := alloc.Get(4)
8691
alloc.Put(data)
8792
newData := alloc.Get(4)
88-
if cap(data) != cap(newData) {
93+
if cap(*data) != cap(*newData) {
8994
t.Fatal("different cap while alloc.Get()")
9095
}
9196
}
@@ -95,3 +100,10 @@ func BenchmarkMSB(b *testing.B) {
95100
msb(rand.Int())
96101
}
97102
}
103+
104+
func BenchmarkAlloc(b *testing.B) {
105+
for i := 0; i < b.N; i++ {
106+
pbuf := defaultAllocator.Get(i % 65536)
107+
defaultAllocator.Put(pbuf)
108+
}
109+
}

session.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -411,17 +411,17 @@ func (s *Session) recvLoop() {
411411
s.streamLock.Unlock()
412412
case cmdPSH: // data frame
413413
if hdr.Length() > 0 {
414-
newbuf := defaultAllocator.Get(int(hdr.Length()))
415-
if written, err := io.ReadFull(s.conn, newbuf); err == nil {
414+
pNewbuf := defaultAllocator.Get(int(hdr.Length()))
415+
if written, err := io.ReadFull(s.conn, *pNewbuf); err == nil {
416416
s.streamLock.Lock()
417417
if stream, ok := s.streams[sid]; ok {
418-
stream.pushBytes(newbuf)
418+
stream.pushBytes(pNewbuf)
419419
// a stream used some token
420420
atomic.AddInt32(&s.bucket, -int32(written))
421421
stream.notifyReadEvent()
422422
} else {
423423
// data directed to a missing/closed stream, recycle the buffer immediately.
424-
defaultAllocator.Put(newbuf)
424+
defaultAllocator.Put(pNewbuf)
425425
}
426426
s.streamLock.Unlock()
427427
} else {

stream.go

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ type stream struct {
4141
id uint32 // Stream identifier
4242
sess *Session
4343

44-
buffers [][]byte // the sequential buffers of stream
45-
heads [][]byte // slice heads of the buffers above, kept for recycle
44+
buffers []*[]byte // the sequential buffers of stream
45+
heads []*[]byte // slice heads of the buffers above, kept for recycle
4646

4747
bufferLock sync.Mutex // Mutex to protect access to buffers
4848
frameSize int // Maximum frame size for the stream
@@ -120,9 +120,10 @@ func (s *stream) tryRead(b []byte) (n int, err error) {
120120
// A critical section to copy data from buffers to
121121
s.bufferLock.Lock()
122122
if len(s.buffers) > 0 {
123-
n = copy(b, s.buffers[0])
124-
s.buffers[0] = s.buffers[0][n:]
125-
if len(s.buffers[0]) == 0 {
123+
n = copy(b, *s.buffers[0])
124+
s.buffers[0] = s.buffers[0]
125+
*s.buffers[0] = (*s.buffers[0])[n:]
126+
if len(*s.buffers[0]) == 0 {
126127
s.buffers[0] = nil
127128
s.buffers = s.buffers[1:]
128129
// full recycle
@@ -154,9 +155,10 @@ func (s *stream) tryReadv2(b []byte) (n int, err error) {
154155
var notifyConsumed uint32
155156
s.bufferLock.Lock()
156157
if len(s.buffers) > 0 {
157-
n = copy(b, s.buffers[0])
158-
s.buffers[0] = s.buffers[0][n:]
159-
if len(s.buffers[0]) == 0 {
158+
n = copy(b, *s.buffers[0])
159+
s.buffers[0] = s.buffers[0]
160+
*s.buffers[0] = (*s.buffers[0])[n:]
161+
if len(*s.buffers[0]) == 0 {
160162
s.buffers[0] = nil
161163
s.buffers = s.buffers[1:]
162164
// full recycle
@@ -214,20 +216,20 @@ func (s *stream) WriteTo(w io.Writer) (n int64, err error) {
214216
}
215217

216218
for {
217-
var buf []byte
219+
var pbuf *[]byte
218220
s.bufferLock.Lock()
219221
if len(s.buffers) > 0 {
220-
buf = s.buffers[0]
222+
pbuf = s.buffers[0]
221223
s.buffers = s.buffers[1:]
222224
s.heads = s.heads[1:]
223225
}
224226
s.bufferLock.Unlock()
225227

226-
if buf != nil {
227-
nw, ew := w.Write(buf)
228+
if pbuf != nil {
229+
nw, ew := w.Write(*pbuf)
228230
// NOTE: WriteTo is a reader, so we need to return tokens here
229-
s.sess.returnTokens(len(buf))
230-
defaultAllocator.Put(buf)
231+
s.sess.returnTokens(len(*pbuf))
232+
defaultAllocator.Put(pbuf)
231233
if nw > 0 {
232234
n += int64(nw)
233235
}
@@ -245,26 +247,30 @@ func (s *stream) WriteTo(w io.Writer) (n int64, err error) {
245247
func (s *stream) writeTov2(w io.Writer) (n int64, err error) {
246248
for {
247249
var notifyConsumed uint32
248-
var buf []byte
250+
var pbuf *[]byte
249251
s.bufferLock.Lock()
250252
if len(s.buffers) > 0 {
251-
buf = s.buffers[0]
253+
pbuf = s.buffers[0]
252254
s.buffers = s.buffers[1:]
253255
s.heads = s.heads[1:]
254256
}
255-
s.numRead += uint32(len(buf))
256-
s.incr += uint32(len(buf))
257-
if s.incr >= uint32(s.sess.config.MaxStreamBuffer/2) || s.numRead == uint32(len(buf)) {
257+
var bufLen uint32
258+
if pbuf != nil {
259+
bufLen = uint32(len(*pbuf))
260+
}
261+
s.numRead += bufLen
262+
s.incr += bufLen
263+
if s.incr >= uint32(s.sess.config.MaxStreamBuffer/2) || s.numRead == bufLen {
258264
notifyConsumed = s.numRead
259265
s.incr = 0
260266
}
261267
s.bufferLock.Unlock()
262268

263-
if buf != nil {
264-
nw, ew := w.Write(buf)
269+
if pbuf != nil {
270+
nw, ew := w.Write(*pbuf)
265271
// NOTE: WriteTo is a reader, so we need to return tokens here
266-
s.sess.returnTokens(len(buf))
267-
defaultAllocator.Put(buf)
272+
s.sess.returnTokens(len(*pbuf))
273+
defaultAllocator.Put(pbuf)
268274
if nw > 0 {
269275
n += int64(nw)
270276
}
@@ -566,10 +572,10 @@ func (s *stream) RemoteAddr() net.Addr {
566572
}
567573

568574
// pushBytes append buf to buffers
569-
func (s *stream) pushBytes(buf []byte) (written int, err error) {
575+
func (s *stream) pushBytes(pbuf *[]byte) (written int, err error) {
570576
s.bufferLock.Lock()
571-
s.buffers = append(s.buffers, buf)
572-
s.heads = append(s.heads, buf)
577+
s.buffers = append(s.buffers, pbuf)
578+
s.heads = append(s.heads, pbuf)
573579
s.bufferLock.Unlock()
574580
return
575581
}
@@ -578,7 +584,7 @@ func (s *stream) pushBytes(buf []byte) (written int, err error) {
578584
func (s *stream) recycleTokens() (n int) {
579585
s.bufferLock.Lock()
580586
for k := range s.buffers {
581-
n += len(s.buffers[k])
587+
n += len(*s.buffers[k])
582588
defaultAllocator.Put(s.heads[k])
583589
}
584590
s.buffers = nil

0 commit comments

Comments
 (0)