Skip to content

Commit

Permalink
the allocator can now be enabled per request
Browse files Browse the repository at this point in the history
Other minor changes as per review comments
  • Loading branch information
drakkan committed Mar 17, 2020
1 parent 7168541 commit 6908603
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 100 deletions.
12 changes: 6 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ script:
- go test -integration -v ./...
- go test -testserver -v ./...
- go test -integration -testserver -v ./...
- go test -integration -optimized-allocator -v ./...
- go test -testserver -optimized-allocator -v ./...
- go test -integration -testserver -optimized-allocator -v ./...
- go test -integration -allocator -v ./...
- go test -testserver -allocator -v ./...
- go test -integration -testserver -allocator -v ./...
- go test -race -integration -v ./...
- go test -race -testserver -v ./...
- go test -race -integration -testserver -v ./...
- go test -race -integration -optimized-allocator -v ./...
- go test -race -testserver -optimized-allocator -v ./...
- go test -race -integration -optimized-allocator -testserver -v ./...
- go test -race -integration -allocator -v ./...
- go test -race -testserver -allocator -v ./...
- go test -race -integration -allocator -testserver -v ./...
24 changes: 12 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
integration:
go test -integration -v
go test -testserver -v
go test -integration -testserver -v
go test -integration -optimized-allocator -v ./...
go test -testserver -optimized-allocator -v ./...
go test -integration -testserver -optimized-allocator -v ./...
go test -integration -v ./...
go test -testserver -v ./...
go test -integration -testserver -v ./...
go test -integration -allocator -v ./...
go test -testserver -allocator -v ./...
go test -integration -testserver -allocator -v ./...

integration_w_race:
go test -race -integration -v
go test -race -testserver -v
go test -race -integration -testserver -v
go test -race -integration -optimized-allocator -v ./...
go test -race -testserver -optimized-allocator -v ./...
go test -race -integration -optimized-allocator -testserver -v ./...
go test -race -integration -v ./...
go test -race -testserver -v ./...
go test -race -integration -testserver -v ./...
go test -race -integration -allocator -v ./...
go test -race -testserver -allocator -v ./...
go test -race -integration -allocator -testserver -v ./...


19 changes: 8 additions & 11 deletions allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
)

type allocator struct {
sync.Mutex
available [][]byte
// map key is the request order
used map[uint32][][]byte
sync.Mutex
}

func newAllocator() *allocator {
return &allocator{
available: nil,
// micro optimization: initialize available pages with an initial capacity, 100 should be enough
available: make([][]byte, 0, 100),
used: make(map[uint32][][]byte),
}
}
Expand All @@ -27,7 +28,7 @@ func (a *allocator) GetPage(requestOrderID uint32) []byte {

var result []byte

// get an available page and remove it from the available ones
// get an available page and remove it from the available ones.
if len(a.available) > 0 {
truncLength := len(a.available) - 1
result = a.available[truncLength]
Expand All @@ -52,15 +53,13 @@ func (a *allocator) ReleasePages(requestOrderID uint32) {
a.Lock()
defer a.Unlock()

if used, ok := a.used[requestOrderID]; ok && len(used) > 0 {
if used := a.used[requestOrderID]; len(used) > 0 {
a.available = append(a.available, used...)
// this is probably useless
a.used[requestOrderID] = nil
}
delete(a.used, requestOrderID)
}

// Free removes all the used and free pages.
// Free removes all the used and available pages.
// Call this method when the allocator is not needed anymore
func (a *allocator) Free() {
a.Lock()
Expand Down Expand Up @@ -92,8 +91,6 @@ func (a *allocator) isRequestOrderIDUsed(requestOrderID uint32) bool {
a.Lock()
defer a.Unlock()

if _, ok := a.used[requestOrderID]; ok {
return true
}
return false
_, ok := a.used[requestOrderID]
return ok
}
1 change: 0 additions & 1 deletion allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/assert"
)

// I like the full flow of the test here, but probably I will be asked to split it in separate test cases
func TestAllocator(t *testing.T) {
allocator := newAllocator()
// get a page for request order id 1
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (c *Client) nextID() uint32 {
}

func (c *Client) recvVersion() error {
typ, data, err := c.recvPacket(nil, 0)
typ, data, err := c.recvPacket(0)
if err != nil {
return err
}
Expand Down
12 changes: 7 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ import (
type conn struct {
io.Reader
io.WriteCloser
// this is the same allocator used in packet manager
alloc *allocator
sync.Mutex // used to serialise writes to sendPacket
// sendPacketTest is needed to replicate packet issues in testing
sendPacketTest func(w io.Writer, m encoding.BinaryMarshaler) error
}

// the allocator and the orderID are used in server mode if AllocationModeOptimized is enabled.
// For the client just pass nil and 0
func (c *conn) recvPacket(alloc *allocator, orderID uint32) (uint8, []byte, error) {
return recvPacket(c, alloc, orderID)
// the orderID is used in server mode if the allocator is enabled.
// For the client mode just pass 0
func (c *conn) recvPacket(orderID uint32) (uint8, []byte, error) {
return recvPacket(c, c.alloc, orderID)
}

func (c *conn) sendPacket(m encoding.BinaryMarshaler) error {
Expand Down Expand Up @@ -78,7 +80,7 @@ func (c *clientConn) recv() error {
c.conn.Close()
}()
for {
typ, data, err := c.recvPacket(nil, 0)
typ, data, err := c.recvPacket(0)
if err != nil {
return err
}
Expand Down
7 changes: 2 additions & 5 deletions packet-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type packetManager struct {
sender packetSender // connection object
working *sync.WaitGroup
packetCount uint32
// it is not nil if AllocationModeOptimized is enabled
// it is not nil if the allocator is enabled
alloc *allocator
}

Expand All @@ -36,9 +36,6 @@ func newPktMgr(sender packetSender) *packetManager {
sender: sender,
working: &sync.WaitGroup{},
}
if enabledAllocationMode == AllocationModeOptimized {
s.alloc = newAllocator()
}
go s.controller()
return s
}
Expand All @@ -50,7 +47,7 @@ func (s *packetManager) newOrderID() uint32 {
}

// returns the next orderID without incrementing it.
// This is used before receiving a new packet in AllocationModeOptimized to associate
// This is used before receiving a new packet, with the allocator enabled, to associate
// the slice allocated for the received packet with the orderID that will be used to mark
// the allocated slices for reuse once the request is served
func (s *packetManager) getNextOrderID() uint32 {
Expand Down
24 changes: 13 additions & 11 deletions packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,24 +149,24 @@ func recvPacket(r io.Reader, alloc *allocator, orderID uint32) (uint8, []byte, e
if _, err := io.ReadFull(r, b[:4]); err != nil {
return 0, nil, err
}
l, _ := unmarshalUint32(b)
if l > maxMsgLength {
debug("recv packet %d bytes too long", l)
length, _ := unmarshalUint32(b)
if length > maxMsgLength {
debug("recv packet %d bytes too long", length)
return 0, nil, errLongPacket
}
if alloc == nil {
b = make([]byte, l)
b = make([]byte, length)
}
if _, err := io.ReadFull(r, b[0:l]); err != nil {
debug("recv packet %d bytes: err %v", l, err)
if _, err := io.ReadFull(r, b[:length]); err != nil {
debug("recv packet %d bytes: err %v", length, err)
return 0, nil, err
}
if debugDumpRxPacketBytes {
debug("recv packet: %s %d bytes %x", fxp(b[0]), l, b[1:l])
debug("recv packet: %s %d bytes %x", fxp(b[0]), length, b[1:length])
} else if debugDumpRxPacket {
debug("recv packet: %s %d bytes", fxp(b[0]), l)
debug("recv packet: %s %d bytes", fxp(b[0]), length)
}
return b[0], b[1:l], nil
return b[0], b[1:length], nil
}

type extensionPair struct {
Expand Down Expand Up @@ -593,11 +593,13 @@ func (p *sshFxpReadPacket) UnmarshalBinary(b []byte) error {

func (p *sshFxpReadPacket) getDataSlice(alloc *allocator, orderID uint32) []byte {
dataLen := clamp(p.Len, maxTxPacket)
// we allocate a slice with a bigger capacity so we avoid a new allocation in sshFxpDataPacket.MarshalBinary
// and in sendPacket, we need 9 bytes in MarshalBinary and 4 bytes in sendPacket.
if alloc != nil {
// GetPage returns a slice with capacity = maxMsgLength this is enough to avoid new allocations in
// sshFxpDataPacket.MarshalBinary and sendPacket
return alloc.GetPage(orderID)[:dataLen]
}
// we allocate a slice with a bigger capacity so we avoid a new allocation in sshFxpDataPacket.MarshalBinary
// and in sendPacket, we need 9 bytes in MarshalBinary and 4 bytes in sendPacket.
return make([]byte, dataLen, dataLen+9+4)
}

Expand Down
44 changes: 38 additions & 6 deletions request-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,50 @@ type RequestServer struct {
}

// NewRequestServer creates/allocates/returns new RequestServer.
// Normally there there will be one server per user-session.
// Normally there will be one server per user-session.
func NewRequestServer(rwc io.ReadWriteCloser, h Handlers) *RequestServer {
rs, _ := NewRequestServerWithOptions(rwc, h)
return rs
}

// A RequestServerOption is a function which applies configuration to a RequestServer.
type RequestServerOption func(*RequestServer) error

// WithRSAllocator enable the allocator.
// After processing a packet we keep in memory the allocated slices
// and we reuse them for new packets.
// The allocator is experimental
func WithRSAllocator() RequestServerOption {
return func(rs *RequestServer) error {
alloc := newAllocator()
rs.pktMgr.alloc = alloc
rs.conn.alloc = alloc
return nil
}
}

// NewRequestServerWithOptions creates/allocates/returns new RequestServer adding the specified options
// If options is nil or empty this is equivalent to NewRequestServer
func NewRequestServerWithOptions(rwc io.ReadWriteCloser, h Handlers, options ...RequestServerOption) (*RequestServer, error) {
svrConn := &serverConn{
conn: conn{
Reader: rwc,
WriteCloser: rwc,
},
}
return &RequestServer{
rs := &RequestServer{
serverConn: svrConn,
Handlers: h,
pktMgr: newPktMgr(svrConn),
openRequests: make(map[string]*Request),
}

for _, o := range options {
if err := o(rs); err != nil {
return nil, err
}
}
return rs, nil
}

// New Open packet/Request
Expand Down Expand Up @@ -88,6 +118,11 @@ func (rs *RequestServer) Close() error { return rs.conn.Close() }

// Serve requests for user session
func (rs *RequestServer) Serve() error {
defer func() {
if rs.pktMgr.alloc != nil {
rs.pktMgr.alloc.Free()
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
Expand All @@ -107,7 +142,7 @@ func (rs *RequestServer) Serve() error {
var pktType uint8
var pktBytes []byte
for {
pktType, pktBytes, err = rs.recvPacket(rs.pktMgr.alloc, rs.pktMgr.getNextOrderID())
pktType, pktBytes, err = rs.serverConn.recvPacket(rs.pktMgr.getNextOrderID())
if err != nil {
// we don't care about releasing allocated pages here, the server will quit and the allocator freed
break
Expand Down Expand Up @@ -150,9 +185,6 @@ func (rs *RequestServer) Serve() error {
delete(rs.openRequests, handle)
req.close()
}
if rs.pktMgr.alloc != nil {
rs.pktMgr.alloc.Free()
}

return err
}
Expand Down
7 changes: 6 additions & 1 deletion request-server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ func clientRequestServerPair(t *testing.T) *csPair {
fd, err := l.Accept()
assert.Nil(t, err)
handlers := InMemHandler()
server = NewRequestServer(fd, handlers)
if *testAllocator {
options := []RequestServerOption{WithRSAllocator()}
server, _ = NewRequestServerWithOptions(fd, handlers, options...)
} else {
server = NewRequestServer(fd, handlers)
}
server.Serve()
}()
<-ready
Expand Down
Loading

0 comments on commit 6908603

Please sign in to comment.