Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add optional allocator #344

Merged
merged 5 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go_import_path: github.com/pkg/sftp
# current and previous stable releases, plus tip
# remember to exclude previous and tip for macs below
go:
- 1.12.x
- 1.13.x
- 1.14.x
- tip

os:
Expand All @@ -15,7 +15,7 @@ os:
matrix:
exclude:
- os: osx
go: 1.12.x
go: 1.13.x
- os: osx
go: tip

Expand All @@ -35,6 +35,12 @@ script:
- go test -integration -v ./...
- go test -testserver -v ./...
- go test -integration -testserver -v ./...
- go test -integration -optimized-allocator -v ./...
- go test -testserver -optimized-allocator -v ./...
- go test -integration -testserver -optimized-allocator -v ./...
- go test -race -integration -v ./...
- go test -race -testserver -v ./...
- go test -race -integration -testserver -v ./...
- go test -race -integration -optimized-allocator -v ./...
- go test -race -testserver -optimized-allocator -v ./...
- go test -race -integration -optimized-allocator -testserver -v ./...
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ integration:
go test -integration -v
go test -testserver -v
go test -integration -testserver -v
go test -integration -optimized-allocator -v ./...
go test -testserver -optimized-allocator -v ./...
go test -integration -testserver -optimized-allocator -v ./...

integration_w_race:
go test -race -integration -v
go test -race -testserver -v
go test -race -integration -testserver -v
go test -race -integration -optimized-allocator -v ./...
go test -race -testserver -optimized-allocator -v ./...
go test -race -integration -optimized-allocator -testserver -v ./...


99 changes: 99 additions & 0 deletions allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package sftp

import (
"sync"
)

type allocator struct {
available [][]byte
// map key is the request order
used map[uint32][][]byte
sync.Mutex
puellanivis marked this conversation as resolved.
Show resolved Hide resolved
}

func newAllocator() *allocator {
return &allocator{
available: nil,
used: make(map[uint32][][]byte),
}
}

// GetPage returns a previously allocated and unused []byte or create a new one.
// The slice have a fixed size = maxMsgLength, this value is suitable for both
// receiving new packets and reading the files to serve
func (a *allocator) GetPage(requestOrderID uint32) []byte {
a.Lock()
defer a.Unlock()

var result []byte

// get an available page and remove it from the available ones
if len(a.available) > 0 {
truncLength := len(a.available) - 1
result = a.available[truncLength]

a.available[truncLength] = nil // clear out the internal pointer
a.available = a.available[:truncLength] // truncate the slice
}

// no preallocated slice found, just allocate a new one
if result == nil {
result = make([]byte, maxMsgLength)
}

// put result in used pages
a.used[requestOrderID] = append(a.used[requestOrderID], result)

return result
}

// ReleasePages marks unused all pages in use for the given requestID
func (a *allocator) ReleasePages(requestOrderID uint32) {
a.Lock()
defer a.Unlock()

if used, ok := a.used[requestOrderID]; ok && len(used) > 0 {
puellanivis marked this conversation as resolved.
Show resolved Hide resolved
a.available = append(a.available, used...)
// this is probably useless
a.used[requestOrderID] = nil
puellanivis marked this conversation as resolved.
Show resolved Hide resolved
}
delete(a.used, requestOrderID)
}

// Free removes all the used and free pages.
// Call this method when the allocator is not needed anymore
func (a *allocator) Free() {
a.Lock()
defer a.Unlock()

a.available = nil
a.used = make(map[uint32][][]byte)
puellanivis marked this conversation as resolved.
Show resolved Hide resolved
}

func (a *allocator) countUsedPages() int {
a.Lock()
defer a.Unlock()

num := 0
for _, p := range a.used {
num += len(p)
}
return num
}

func (a *allocator) countAvailablePages() int {
a.Lock()
defer a.Unlock()

return len(a.available)
}

func (a *allocator) isRequestOrderIDUsed(requestOrderID uint32) bool {
a.Lock()
defer a.Unlock()

if _, ok := a.used[requestOrderID]; ok {
puellanivis marked this conversation as resolved.
Show resolved Hide resolved
return true
}
return false
}
136 changes: 136 additions & 0 deletions allocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package sftp

import (
"strconv"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
)

// I like the full flow of the test here, but probably I will be asked to split it in separate test cases
func TestAllocator(t *testing.T) {
allocator := newAllocator()
// get a page for request order id 1
page := allocator.GetPage(1)
page[1] = uint8(1)
assert.Equal(t, maxMsgLength, len(page))
assert.Equal(t, 1, allocator.countUsedPages())
// get another page for request order id 1, we now have 2 used pages
page = allocator.GetPage(1)
page[0] = uint8(2)
assert.Equal(t, 2, allocator.countUsedPages())
// get another page for request order id 1, we now have 3 used pages
page = allocator.GetPage(1)
page[2] = uint8(3)
assert.Equal(t, 3, allocator.countUsedPages())
// release the page for request order id 1, we now have 3 available pages
allocator.ReleasePages(1)
assert.NotContains(t, allocator.used, 1)
assert.Equal(t, 3, allocator.countAvailablePages())
// get a page for request order id 2
// we get the latest released page, let's verify that by checking the previously written values
// so we are sure we are reusing a previously allocated page
page = allocator.GetPage(2)
assert.Equal(t, uint8(3), page[2])
assert.Equal(t, 2, allocator.countAvailablePages())
assert.Equal(t, 1, allocator.countUsedPages())
page = allocator.GetPage(2)
assert.Equal(t, uint8(2), page[0])
assert.Equal(t, 1, allocator.countAvailablePages())
assert.Equal(t, 2, allocator.countUsedPages())
page = allocator.GetPage(2)
assert.Equal(t, uint8(1), page[1])
// we now have 3 used pages for request order id 2 and no available pages
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 3, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
// release some request order id with no allocated pages, should have no effect
allocator.ReleasePages(1)
allocator.ReleasePages(3)
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 3, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
// now get some pages for another request order id
allocator.GetPage(3)
// we now must have 3 used pages for request order id 2 and 1 used page for request order id 3
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 4, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.True(t, allocator.isRequestOrderIDUsed(3), "page with request order id 3 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
// get another page for request order id 3
allocator.GetPage(3)
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 5, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.True(t, allocator.isRequestOrderIDUsed(3), "page with request order id 3 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
// now release the pages for request order id 3
allocator.ReleasePages(3)
assert.Equal(t, 2, allocator.countAvailablePages())
assert.Equal(t, 3, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
assert.False(t, allocator.isRequestOrderIDUsed(3), "page with request order id 3 must be not used")
// again check we are reusing previously allocated pages.
// We have written nothing to the 2 last requested page so release them and get the third one
allocator.ReleasePages(2)
assert.Equal(t, 5, allocator.countAvailablePages())
assert.Equal(t, 0, allocator.countUsedPages())
assert.False(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be not used")
allocator.GetPage(4)
allocator.GetPage(4)
page = allocator.GetPage(4)
assert.Equal(t, uint8(3), page[2])
assert.Equal(t, 2, allocator.countAvailablePages())
assert.Equal(t, 3, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(4), "page with request order id 4 must be used")
// free the allocator
allocator.Free()
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 0, allocator.countUsedPages())
}

func BenchmarkAllocatorSerial(b *testing.B) {
allocator := newAllocator()
for i := 0; i < b.N; i++ {
benchAllocator(allocator, uint32(i))
}
}

func BenchmarkAllocatorParallel(b *testing.B) {
var counter uint32
allocator := newAllocator()
for i := 1; i <= 8; i *= 2 {
b.Run(strconv.Itoa(i), func(b *testing.B) {
b.SetParallelism(i)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
benchAllocator(allocator, atomic.AddUint32(&counter, 1))
}
})
})
}
}

func benchAllocator(allocator *allocator, requestOrderID uint32) {
// simulates the page requested in recvPacket
allocator.GetPage(requestOrderID)
// simulates the page requested in fileget for downloads
allocator.GetPage(requestOrderID)
// release the allocated pages
allocator.ReleasePages(requestOrderID)
}

// useful for debug
func printAllocatorContents(allocator *allocator) {
for o, u := range allocator.used {
debug("used order id: %v, values: %+v", o, u)
}
for _, v := range allocator.available {
debug("available, values: %+v", v)
}
}
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (c *Client) nextID() uint32 {
}

func (c *Client) recvVersion() error {
typ, data, err := c.recvPacket()
typ, data, err := c.recvPacket(nil, 0)
if err != nil {
return err
}
Expand Down
8 changes: 5 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ type conn struct {
sendPacketTest func(w io.Writer, m encoding.BinaryMarshaler) error
}

func (c *conn) recvPacket() (uint8, []byte, error) {
return recvPacket(c)
// the allocator and the orderID are used in server mode if AllocationModeOptimized is enabled.
// For the client just pass nil and 0
func (c *conn) recvPacket(alloc *allocator, orderID uint32) (uint8, []byte, error) {
puellanivis marked this conversation as resolved.
Show resolved Hide resolved
return recvPacket(c, alloc, orderID)
}

func (c *conn) sendPacket(m encoding.BinaryMarshaler) error {
Expand Down Expand Up @@ -76,7 +78,7 @@ func (c *clientConn) recv() error {
c.conn.Close()
}()
for {
typ, data, err := c.recvPacket()
typ, data, err := c.recvPacket(nil, 0)
if err != nil {
return err
}
Expand Down
17 changes: 17 additions & 0 deletions packet-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type packetManager struct {
sender packetSender // connection object
working *sync.WaitGroup
packetCount uint32
// it is not nil if AllocationModeOptimized is enabled
alloc *allocator
}

type packetSender interface {
Expand All @@ -34,6 +36,9 @@ func newPktMgr(sender packetSender) *packetManager {
sender: sender,
working: &sync.WaitGroup{},
}
if enabledAllocationMode == AllocationModeOptimized {
s.alloc = newAllocator()
}
go s.controller()
return s
}
Expand All @@ -44,6 +49,14 @@ func (s *packetManager) newOrderID() uint32 {
return s.packetCount
}

// returns the next orderID without incrementing it.
// This is used before receiving a new packet in AllocationModeOptimized to associate
// the slice allocated for the received packet with the orderID that will be used to mark
// the allocated slices for reuse once the request is served
func (s *packetManager) getNextOrderID() uint32 {
return s.packetCount + 1
}

type orderedRequest struct {
requestPacket
orderid uint32
Expand Down Expand Up @@ -174,6 +187,10 @@ func (s *packetManager) maybeSendPackets() {
if in.orderID() == out.orderID() {
debug("Sending packet: %v", out.id())
s.sender.sendPacket(out.(encoding.BinaryMarshaler))
if s.alloc != nil {
// mark for reuse the slices allocated for this request
s.alloc.ReleasePages(in.orderID())
}
// pop off heads
copy(s.incoming, s.incoming[1:]) // shift left
s.incoming[len(s.incoming)-1] = nil // clear last
Expand Down
Loading